You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 21:55:51 UTC

[1/2] incubator-beam git commit: Improve ReduceFnRunner prefetching

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9d380de96 -> c72708cd2


Improve ReduceFnRunner prefetching

- add prefetch* methods for prefetching state matching existing methods
- prefetch triggers in processElements
- replace onTimer with batched onTimers method to allow prefetching across
  timers

Additionally remove deprecated TimerCallback usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b044f3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b044f3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b044f3f

Branch: refs/heads/master
Commit: 2b044f3f315655b863dbc7fd298f33c196fb8ef7
Parents: 9d380de
Author: Sam Whittle <sa...@google.com>
Authored: Thu Nov 10 12:59:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 6 13:54:50 2016 -0800

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  48 +-
 .../beam/runners/core/PaneInfoTracker.java      |   4 +
 .../runners/core/ReduceFnContextFactory.java    |   9 +-
 .../beam/runners/core/ReduceFnRunner.java       | 493 ++++++++++++-------
 .../apache/beam/runners/core/WatermarkHold.java |   5 +
 .../triggers/TriggerStateMachineRunner.java     |  14 +-
 .../beam/runners/core/ReduceFnTester.java       |  77 ++-
 .../triggers/TriggerStateMachineTester.java     |  17 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   5 +-
 10 files changed, 440 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 2082269..14171b3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -72,9 +71,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   @Override
   public void processElement(ProcessContext c) throws Exception {
-    KeyedWorkItem<K, InputT> element = c.element();
+    KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
 
-    K key = c.element().key();
+    K key = keyedWorkItem.key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
     StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
@@ -92,10 +91,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             reduceFn,
             c.getPipelineOptions());
 
-    reduceFnRunner.processElements(element.elementsIterable());
-    for (TimerData timer : element.timersIterable()) {
-      reduceFnRunner.onTimer(timer);
-    }
+    reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+    reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
     reduceFnRunner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index b4b366c..9189191 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -18,17 +18,18 @@
 package org.apache.beam.runners.core;
 
 import com.google.common.collect.Iterables;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.TimerCallback;
 import org.joda.time.Instant;
 
 /**
@@ -59,9 +60,8 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
     // timer manager from the context because it doesn't exist. So we create one and emulate the
     // watermark, knowing that we have all data and it is in timestamp order.
     InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now());
-    timerInternals.advanceSynchronizedProcessingTime(
-        TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
     StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
@@ -85,22 +85,50 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
       reduceFnRunner.processElements(chunk);
 
       // Then, since elements are sorted by their timestamp, advance the input watermark
-      // to the first element, and fire any timers that may have been scheduled.
-      timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+      // to the first element.
+      timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
+      // Advance the processing times.
+      timerInternals.advanceProcessingTime(Instant.now());
+      timerInternals.advanceSynchronizedProcessingTime(Instant.now());
 
-      // Fire any processing timers that need to fire
-      timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+      // Fire all the eligible timers.
+      fireEligibleTimers(timerInternals, reduceFnRunner);
 
       // Leave the output watermark undefined. Since there's no late data in batch mode
       // there's really no need to track it as we do for streaming.
     }
 
     // Finish any pending windows by advancing the input watermark to infinity.
-    timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Finally, advance the processing time to infinity to fire any timers.
-    timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, reduceFnRunner);
 
     reduceFnRunner.persist();
   }
+
+  private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+        TimerInternals.TimerData timer;
+        while ((timer = timerInternals.removeNextEventTimer()) != null) {
+          timers.add(timer);
+        }
+        while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+          timers.add(timer);
+        }
+        while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+          timers.add(timer);
+        }
+        if (timers.isEmpty()) {
+          break;
+        }
+        reduceFnRunner.onTimers(timers);
+        timers.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 8140243..69a4cfd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,6 +54,10 @@ public class PaneInfoTracker {
     state.access(PANE_INFO_TAG).clear();
   }
 
+  public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
+    context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
+  }
+
   /**
    * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
    * info includes the timing for the pane, who's calculation is quite subtle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 539126a..c5bda9b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
 import org.apache.beam.sdk.util.state.StateContext;
@@ -117,7 +116,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
-      ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+      PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
     return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
   }
 
@@ -389,11 +388,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
   private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
     private final StateAccessorImpl<K, W> state;
-    private final ReadableState<PaneInfo> pane;
+    private final PaneInfo pane;
     private final OnTriggerCallbacks<OutputT> callbacks;
     private final TimersImpl timers;
 
-    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
         OnTriggerCallbacks<OutputT> callbacks) {
       reduceFn.super();
       this.state = state;
@@ -424,7 +423,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public PaneInfo paneInfo() {
-      return pane.read();
+      return pane;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a686f46..6f7bbcf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -21,12 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,10 +61,8 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -91,7 +92,7 @@ import org.joda.time.Instant;
  * @param <OutputT> The output type that will be produced for each key.
  * @param <W> The type of windows this operates on.
  */
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements TimerCallback {
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
   /**
    * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
@@ -268,6 +269,32 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return activeWindows.getActiveAndNewWindows().isEmpty();
   }
 
+  private Set<W> openWindows(Collection<W> windows) {
+    Set<W> result = new HashSet<>();
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+          window, StateStyle.DIRECT);
+      if (!triggerRunner.isClosed(directContext.state())) {
+        result.add(window);
+      }
+    }
+    return result;
+  }
+
+  private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
+    Collection<W> result = new ArrayList<>();
+    // Filter out timers that didn't trigger.
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(window, StateStyle.DIRECT);
+      if (triggerRunner.shouldFire(
+          directContext.window(), directContext.timers(), directContext.state())) {
+        result.add(window);
+      }
+    }
+    return result;
+  }
+
   /**
    * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
    * triggers, and window merging.
@@ -293,25 +320,54 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
    * </ol>
    */
   public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+    if (!values.iterator().hasNext()) {
+      return;
+    }
+
+    // Determine all the windows for elements.
+    Set<W> windows = collectWindows(values);
     // If an incoming element introduces a new window, attempt to merge it into an existing
     // window eagerly.
-    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+    Map<W, W> windowToMergeResult = mergeWindows(windows);
+    if (!windowToMergeResult.isEmpty()) {
+      // Update windows by removing all windows that were merged away and adding
+      // the windows they were merged to. We add after completing all the
+      // removals to avoid removing a window that was also added.
+      List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
+      for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
+        windows.remove(entry.getKey());
+        addedWindows.add(entry.getValue());
+      }
+      windows.addAll(addedWindows);
+    }
 
-    Set<W> windowsToConsider = new HashSet<>();
+    prefetchWindowsForValues(windows);
 
-    // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
+    // All windows that are open before element processing may need to fire.
+    Set<W> windowsToConsider = openWindows(windows);
+
+    // Process each element, using the updated activeWindows determined by mergeWindows.
     for (WindowedValue<InputT> value : values) {
-      windowsToConsider.addAll(processElement(windowToMergeResult, value));
+      processElement(windowToMergeResult, value);
     }
 
-    // Trigger output from any window for which the trigger is ready
+    // Now that we've processed the elements, see if any of the windows need to fire.
+    // Prefetch state necessary to determine if the triggers should fire.
     for (W mergedWindow : windowsToConsider) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(mergedWindow, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-          contextFactory.base(mergedWindow, StateStyle.RENAMED);
-      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
-      emitIfAppropriate(directContext, renamedContext);
+      triggerRunner.prefetchShouldFire(
+          mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
+    }
+    // Filter to windows that are firing.
+    Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
+    // Prefetch windows that are firing.
+    for (W window : windowsToFire) {
+      prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
+          contextFactory.base(window, StateStyle.RENAMED));
+    }
+    // Trigger output from firing windows.
+    for (W window : windowsToFire) {
+      emit(contextFactory.base(window, StateStyle.DIRECT),
+          contextFactory.base(window, StateStyle.RENAMED));
     }
 
     // We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -325,52 +381,61 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Extract the windows associated with the values, and invoke merge. Return a map
-   * from windows to the merge result window. If a window is not in the domain of
-   * the result map then it did not get merged into a different window.
+   * Extract the windows associated with the values.
    */
-  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
-      throws Exception {
-    // No-op if no merging can take place
+  private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
+    Set<W> windows = new HashSet<>();
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+          W window = (W) untypedWindow;
+        windows.add(window);
+      }
+    }
+    return windows;
+  }
+
+  /**
+   * Invoke merge for the given windows and return a map from windows to the
+   * merge result window. Windows that were not merged are not present in the
+   * map.
+   */
+  private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
     if (windowingStrategy.getWindowFn().isNonMerging()) {
-      return ImmutableMap.of();
+      // Return an empty map, indicating that every window is not merged.
+      return Collections.emptyMap();
     }
 
+    Map<W, W> windowToMergeResult = new HashMap<>();
     // Collect the windows from all elements (except those which are too late) and
     // make sure they are already in the active window set or are added as NEW windows.
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow untypedWindow : value.getWindows()) {
-        @SuppressWarnings("unchecked")
-        W window = (W) untypedWindow;
-
-        // For backwards compat with pre 1.4 only.
-        // We may still have ACTIVE windows with multiple state addresses, representing
-        // a window who's state has not yet been eagerly merged.
-        // We'll go ahead and merge that state now so that we don't have to worry about
-        // this legacy case anywhere else.
-        if (activeWindows.isActive(window)) {
-          Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
-          if (stateAddressWindows.size() > 1) {
-            // This is a legacy window who's state has not been eagerly merged.
-            // Do that now.
-            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
-                contextFactory.forPremerge(window);
-            reduceFn.onMerge(premergeContext);
-            watermarkHold.onMerge(premergeContext);
-            activeWindows.merged(window);
-          }
+    for (W window : windows) {
+      // For backwards compat with pre 1.4 only.
+      // We may still have ACTIVE windows with multiple state addresses, representing
+      // a window who's state has not yet been eagerly merged.
+      // We'll go ahead and merge that state now so that we don't have to worry about
+      // this legacy case anywhere else.
+      if (activeWindows.isActive(window)) {
+        Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+        if (stateAddressWindows.size() > 1) {
+          // This is a legacy window who's state has not been eagerly merged.
+          // Do that now.
+          ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+              contextFactory.forPremerge(window);
+          reduceFn.onMerge(premergeContext);
+          watermarkHold.onMerge(premergeContext);
+          activeWindows.merged(window);
         }
-
-        // Add this window as NEW if it is not currently ACTIVE.
-        // If we had already seen this window and closed its trigger, then the
-        // window will not be currently ACTIVE. It will then be added as NEW here,
-        // and fall into the merging logic as usual.
-        activeWindows.ensureWindowExists(window);
       }
+
+      // Add this window as NEW if it is not currently ACTIVE.
+      // If we had already seen this window and closed its trigger, then the
+      // window will not be currently ACTIVE. It will then be added as NEW here,
+      // and fall into the merging logic as usual.
+      activeWindows.ensureWindowExists(window);
     }
 
     // Merge all of the active windows and retain a mapping from source windows to result windows.
-    Map<W, W> windowToMergeResult = new HashMap<>();
     activeWindows.merge(new OnMergeCallback(windowToMergeResult));
     return windowToMergeResult;
   }
@@ -472,38 +537,50 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Process an element.
-   *
-   * @param value the value being processed
-   * @return the set of windows in which the element was actually processed
+   * Redirect element windows to the ACTIVE windows they have been merged into.
+   * The compressed representation (value, {window1, window2, ...}) actually represents
+   * distinct elements (value, window1), (value, window2), ...
+   * so if window1 and window2 merge, the resulting window will contain both copies
+   * of the value.
    */
-  private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
-      throws Exception {
-    // Redirect element windows to the ACTIVE windows they have been merged into.
-    // The compressed representation (value, {window1, window2, ...}) actually represents
-    // distinct elements (value, window1), (value, window2), ...
-    // so if window1 and window2 merge, the resulting window will contain both copies
-    // of the value.
-    Collection<W> windows = new ArrayList<>();
-    for (BoundedWindow untypedWindow : value.getWindows()) {
-      @SuppressWarnings("unchecked")
-      W window = (W) untypedWindow;
-      W mergeResult = windowToMergeResult.get(window);
-      if (mergeResult == null) {
-        mergeResult = window;
-      }
-      windows.add(mergeResult);
-    }
+  private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
+      final Collection<? extends BoundedWindow> windows) {
+    return ImmutableSet.copyOf(
+        FluentIterable.from(windows).transform(
+            new Function<BoundedWindow, W>() {
+              @Override
+              public W apply(BoundedWindow untypedWindow) {
+                @SuppressWarnings("unchecked")
+                W window = (W) untypedWindow;
+                W mergedWindow = windowToMergeResult.get(window);
+                // If the element is not present in the map, the window is unmerged.
+                return (mergedWindow == null) ? window : mergedWindow;
+              }
+            }
+        ));
+  }
 
+  private void prefetchWindowsForValues(Collection<W> windows) {
     // Prefetch in each of the windows if we're going to need to process triggers
     for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
-          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+          window, StateStyle.DIRECT);
       triggerRunner.prefetchForValue(window, directContext.state());
     }
+  }
+
+  /**
+   * Process an element.
+   *
+   * @param windowToMergeResult map of windows to merged windows. If a window is
+   * not present it is unmerged.
+   * @param value the value being processed
+   */
+  private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+      throws Exception {
+    ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
 
     // Process the element for each (mergeResultWindow, not closed) window it belongs to.
-    List<W> triggerableWindows = new ArrayList<>(windows.size());
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
@@ -518,7 +595,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
         continue;
       }
 
-      triggerableWindows.add(window);
       activeWindows.ensureWindowIsActive(window);
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
@@ -562,102 +638,152 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
       // cannot take a trigger state from firing to non-firing.
       // (We don't actually assert this since it is too slow.)
     }
-
-    return triggerableWindows;
   }
 
   /**
-   * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+   * Enriches TimerData with state necessary for processing a timer as well as
+   * common queries about a timer.
    */
-  public void onTimer(TimerData timer) throws Exception {
-    // Which window is the timer for?
-    checkArgument(timer.getNamespace() instanceof WindowNamespace,
-        "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
-    @SuppressWarnings("unchecked")
-    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
-    W window = windowNamespace.getWindow();
-    ReduceFn<K, InputT, OutputT, W>.Context directContext =
-        contextFactory.base(window, StateStyle.DIRECT);
-    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-        contextFactory.base(window, StateStyle.RENAMED);
+  private class EnrichedTimerData {
+    public final Instant timestamp;
+    public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
+    public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
+    // If this is an end-of-window timer then we may need to set a garbage collection timer
+    // if allowed lateness is non-zero.
+    public final boolean isEndOfWindow;
+    // If this is a garbage collection timer then we should trigger and
+    // garbage collect the window.  We'll consider any timer at or after the
+    // end-of-window time to be a signal to garbage collect.
+    public final boolean isGarbageCollection;
+
+    EnrichedTimerData(
+        TimerData timer,
+        ReduceFn<K, InputT, OutputT, W>.Context directContext,
+        ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+      this.timestamp = timer.getTimestamp();
+      this.directContext = directContext;
+      this.renamedContext = renamedContext;
+      W window = directContext.window();
+      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+          && timer.getTimestamp().equals(window.maxTimestamp());
+      Instant cleanupTime = garbageCollectionTime(window);
+      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+    }
 
     // Has this window had its trigger finish?
     // - The trigger may implement isClosed as constant false.
     // - If the window function does not support windowing then all windows will be considered
     // active.
     // So we must take conjunction of activeWindows and triggerRunner state.
-    boolean windowIsActiveAndOpen =
-        activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+    public boolean windowIsActiveAndOpen() {
+      return activeWindows.isActive(directContext.window())
+          && !triggerRunner.isClosed(directContext.state());
+    }
+  }
 
-    if (!windowIsActiveAndOpen) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+  public void onTimers(Iterable<TimerData> timers) throws Exception {
+    if (!timers.iterator().hasNext()) {
+      return;
     }
 
-    // If this is an end-of-window timer then we may need to set a garbage collection timer
-    // if allowed lateness is non-zero.
-    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-        && timer.getTimestamp().equals(window.maxTimestamp());
-
-    // If this is a garbage collection timer then we should trigger and garbage collect the window.
-    // We'll consider any timer at or after the end-of-window time to be a signal to garbage
-    // collect.
-    Instant cleanupTime = garbageCollectionTime(window);
-    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
-        && !timer.getTimestamp().isBefore(cleanupTime);
-
-    if (isGarbageCollection) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-
-      if (windowIsActiveAndOpen) {
-        // We need to call onTrigger to emit the final pane if required.
-        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
-        // and the watermark has passed the end of the window.
-        @Nullable Instant newHold =
-            onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
-        checkState(newHold == null,
-            "Hold placed at %s despite isFinished being true.", newHold);
+    // Create a reusable context for each timer and begin prefetching necessary
+    // state.
+    List<EnrichedTimerData> enrichedTimers = new LinkedList();
+    for (TimerData timer : timers) {
+      checkArgument(timer.getNamespace() instanceof WindowNamespace,
+          "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+      @SuppressWarnings("unchecked")
+        WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+      W window = windowNamespace.getWindow();
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(window, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+          contextFactory.base(window, StateStyle.RENAMED);
+      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
+      enrichedTimers.add(enrichedTimer);
+
+      // Perform prefetching of state to determine if the trigger should fire.
+      if (enrichedTimer.isGarbageCollection) {
+        triggerRunner.prefetchIsClosed(directContext.state());
+      } else {
+        triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
       }
+    }
 
-      // Cleanup flavor B: Clear all the remaining state for this window since we'll never
-      // see elements for it again.
-      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
-    } else {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-      if (windowIsActiveAndOpen) {
-        emitIfAppropriate(directContext, renamedContext);
+    // For those windows that are active and open, prefetch the triggering or emitting state.
+    for (EnrichedTimerData timer : enrichedTimers) {
+      if (timer.windowIsActiveAndOpen()) {
+        ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+        if (timer.isGarbageCollection) {
+          prefetchOnTrigger(directContext, timer.renamedContext);
+        } else if (triggerRunner.shouldFire(
+            directContext.window(), directContext.timers(), directContext.state())) {
+          prefetchEmit(directContext, timer.renamedContext);
+        }
       }
+    }
 
-      if (isEndOfWindow) {
-        // If the window strategy trigger includes a watermark trigger then at this point
-        // there should be no data holds, either because we'd already cleared them on an
-        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
-        // We could assert this but it is very expensive.
-
-        // Since we are processing an on-time firing we should schedule the garbage collection
-        // timer. (If getAllowedLateness is zero then the timer event will be considered a
-        // cleanup event and handled by the above).
-        // Note we must do this even if the trigger is finished so that we are sure to cleanup
-        // any final trigger finished bits.
-        checkState(
-            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
-            "Unexpected zero getAllowedLateness");
-        WindowTracing.debug(
-            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
-            + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+    // Perform processing now that everything is prefetched.
+    for (EnrichedTimerData timer : enrichedTimers) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+
+      if (timer.isGarbageCollection) {
+        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+                + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), timer.timestamp,
+            timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                                 "Cleanup time %s is beyond end-of-time", cleanupTime);
-        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+
+        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+        if (windowIsActiveAndOpen) {
+          // We need to call onTrigger to emit the final pane if required.
+          // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+          // and the watermark has passed the end of the window.
+          @Nullable
+          Instant newHold = onTrigger(
+              directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+          checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
+        }
+
+        // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+        // see elements for it again.
+        clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+      } else {
+        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+                + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), timer.timestamp,
+            timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        if (timer.windowIsActiveAndOpen()
+            && triggerRunner.shouldFire(
+                   directContext.window(), directContext.timers(), directContext.state())) {
+          emit(directContext, renamedContext);
+        }
+
+        if (timer.isEndOfWindow) {
+          // If the window strategy trigger includes a watermark trigger then at this point
+          // there should be no data holds, either because we'd already cleared them on an
+          // earlier onTrigger, or because we just cleared them on the above emit.
+          // We could assert this but it is very expensive.
+
+          // Since we are processing an on-time firing we should schedule the garbage collection
+          // timer. (If getAllowedLateness is zero then the timer event will be considered a
+          // cleanup event and handled by the above).
+          // Note we must do this even if the trigger is finished so that we are sure to cleanup
+          // any final trigger finished bits.
+          checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+              "Unexpected zero getAllowedLateness");
+          Instant cleanupTime = garbageCollectionTime(directContext.window());
+          WindowTracing.debug(
+              "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+                  + "inputWatermark:{}; outputWatermark:{}",
+              key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+              timerInternals.currentOutputWatermarkTime());
+          checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+              "Cleanup time %s is beyond end-of-time", cleanupTime);
+          directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+        }
       }
     }
   }
@@ -666,7 +792,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
    * Clear all the state associated with {@code context}'s window.
    * Should only be invoked if we know all future elements for this window will be considered
    * beyond allowed lateness.
-   * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+   * This is a superset of the clearing done by {@link #emit} below since:
    * <ol>
    * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
    * closed again.
@@ -692,10 +818,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     } else {
       // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
       // For (1), if !activeWindows.isActive then the window must be merging and has been
-      // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+      // explicitly removed by emit. But in that case the trigger must have fired
       // and been closed, so this case reduces to (2).
       // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
-      // closed state. In that case emitIfAppropriate will have cleared all state in
+      // closed state. In that case emit will have cleared all state in
       // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
       // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
       // Since the trigger fired the existing watermark holds must have been cleared, and since
@@ -737,17 +863,23 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return false;
   }
 
+  private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+                                ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+    triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
+    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+    triggerRunner.prefetchIsClosed(directContext.state());
+    prefetchOnTrigger(directContext, renamedContext);
+  }
+
   /**
-   * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+   * Emit if a trigger is ready to fire or timers require it, and cleanup state.
    */
-  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+  private void emit(
+      ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
       throws Exception {
-    if (!triggerRunner.shouldFire(
-        directContext.window(), directContext.timers(), directContext.state())) {
-      // Ignore unless trigger is ready to fire
-      return;
-    }
+    checkState(triggerRunner.shouldFire(
+        directContext.window(), directContext.timers(), directContext.state()));
 
     // Inform the trigger of the transition to see if it is finished
     triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
@@ -782,7 +914,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Do we need to emit a pane?
+   * Do we need to emit?
    */
   private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
     if (!isEmpty) {
@@ -800,6 +932,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return false;
   }
 
+  private void prefetchOnTrigger(
+      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+    paneInfoTracker.prefetchPaneInfo(directContext);
+    watermarkHold.prefetchExtract(renamedContext);
+    nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+    reduceFn.prefetchOnTrigger(directContext.state());
+  }
+
   /**
    * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
    *
@@ -813,25 +954,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
 
-    // Prefetch necessary states
-    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
-        watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
-    ReadableState<PaneInfo> paneFuture =
-        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
-    ReadableState<Boolean> isEmptyFuture =
-        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
-    reduceFn.prefetchOnTrigger(directContext.state());
-    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
     // Calculate the pane info.
-    final PaneInfo pane = paneFuture.read();
-    // Extract the window hold, and as a side effect clear it.
+    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
 
-    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    // Extract the window hold, and as a side effect clear it.
+    final WatermarkHold.OldAndNewHolds pair =
+        watermarkHold.extractAndRelease(renamedContext, isFinished).read();
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
+    final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -863,11 +996,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     }
 
     // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+    if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
       final List<W> windows = Collections.singletonList(directContext.window());
       ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
-          contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+          contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED,
               new OnTriggerCallbacks<OutputT>() {
                 @Override
                 public void output(OutputT toOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 3c04571..7f1afcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -444,6 +444,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     }
   }
 
+  public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
+    context.state().access(elementHoldTag).readLater();
+    context.state().access(EXTRA_HOLD_TAG).readLater();
+  }
+
   /**
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 9f03216..2f277eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> {
     return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
   }
 
-  public void prefetchForValue(W window, StateAccessor<?> state) {
+  public void prefetchIsClosed(StateAccessor<?> state) {
     if (isFinishedSetNeeded()) {
       state.access(FINISHED_BITS_TAG).readLater();
     }
+  }
+
+  public void prefetchForValue(W window, StateAccessor<?> state) {
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchOnElement(
         contextFactory.createStateAccessor(window, rootTrigger));
   }
 
   public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
   }
 
   public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchShouldFire(
         contextFactory.createStateAccessor(window, rootTrigger));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 337be23..db0cf91 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -78,7 +78,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -100,7 +99,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   private final TestInMemoryStateInternals<String> stateInternals =
       new TestInMemoryStateInternals<>(KEY);
-  private final TestTimerInternals timerInternals = new TestTimerInternals();
+  private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
 
   private final WindowFn<Object, W> windowFn;
   private final TestOutputWindowedValue testOutputter;
@@ -443,8 +442,29 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    * fire. Then advance the output watermark as far as possible.
    */
   public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+    timerInternals.advanceInputWatermark(newInputWatermark);
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    timerInternals.advanceInputWatermark(runner, newInputWatermark);
+    while (true) {
+      TimerData timer;
+      List<TimerInternals.TimerData> timers = new ArrayList<>();
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      runner.onTimers(timers);
+    }
+    if (autoAdvanceOutputWatermark) {
+      Instant hold = stateInternals.earliestWatermarkHold();
+      if (hold == null) {
+        WindowTracing.trace(
+            "TestInMemoryTimerInternals.advanceInputWatermark: no holds, "
+                + "so output watermark = input watermark");
+        hold = timerInternals.currentInputWatermarkTime();
+      }
+      advanceOutputWatermark(hold);
+    }
     runner.persist();
   }
 
@@ -458,8 +478,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   /** Advance the processing time to the specified time, firing any timers that should fire. */
   public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+    timerInternals.advanceProcessingTime(newProcessingTime);
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    timerInternals.advanceProcessingTime(runner, newProcessingTime);
+    while (true) {
+      TimerData timer;
+      List<TimerInternals.TimerData> timers = new ArrayList<>();
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      runner.onTimers(timers);
+    }
     runner.persist();
   }
 
@@ -467,9 +498,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    * Advance the synchronized processing time to the specified time,
    * firing any timers that should fire.
    */
-  public void advanceSynchronizedProcessingTime(Instant newProcessingTime) throws Exception {
+  public void advanceSynchronizedProcessingTime(
+      Instant newSynchronizedProcessingTime) throws Exception {
+    timerInternals.advanceSynchronizedProcessingTime(newSynchronizedProcessingTime);
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    timerInternals.advanceSynchronizedProcessingTime(runner, newProcessingTime);
+    while (true) {
+      TimerData timer;
+      List<TimerInternals.TimerData> timers = new ArrayList<>();
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      runner.onTimers(timers);
+    }
     runner.persist();
   }
 
@@ -509,8 +552,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.onTimer(
+    ArrayList timers = new ArrayList(1);
+    timers.add(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+    runner.onTimers(timers);
     runner.persist();
   }
 
@@ -601,22 +646,4 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
       return sum;
     }
   }
-
-  private class TestTimerInternals extends InMemoryTimerInternals {
-    @Override
-    public void advanceInputWatermark(TimerCallback timerCallback, Instant newInputWatermark)
-        throws Exception {
-      super.advanceInputWatermark(timerCallback, newInputWatermark);
-      if (autoAdvanceOutputWatermark) {
-        Instant hold = stateInternals.earliestWatermarkHold();
-        if (hold == null) {
-          WindowTracing.trace(
-              "TestInMemoryTimerInternals.advanceInputWatermark: no holds, "
-                  + "so output watermark = input watermark");
-          hold = currentInputWatermarkTime();
-        }
-        advanceOutputWatermark(hold);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 1ccca17..ed5ce9c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
 import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -221,14 +220,22 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
    * possible.
    */
   public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
-    // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
+    timerInternals.advanceInputWatermark(newInputWatermark);
+    while (timerInternals.removeNextEventTimer() != null) {
+      // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+    }
   }
 
   /** Advance the processing time to the specified time. */
   public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
-    // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
+    timerInternals.advanceProcessingTime(newProcessingTime);
+    while (timerInternals.removeNextProcessingTimer() != null) {
+      // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+    }
+    timerInternals.advanceSynchronizedProcessingTime(newProcessingTime);
+    while (timerInternals.removeNextSynchronizedProcessingTimer() != null) {
+      // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 5c6b2c1..87cbbcd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -201,9 +200,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       // Drop any elements within expired windows
       reduceFnRunner.processElements(
           dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
-      for (TimerData timer : workItem.timersIterable()) {
-        reduceFnRunner.onTimer(timer);
-      }
+      reduceFnRunner.onTimers(workItem.timersIterable());
       reduceFnRunner.persist();
     }
 


[2/2] incubator-beam git commit: This closes #1519

Posted by ke...@apache.org.
This closes #1519


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c72708cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c72708cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c72708cd

Branch: refs/heads/master
Commit: c72708cd2c5725a739a4198bcede1560812bb5af
Parents: 9d380de 2b044f3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 13:55:32 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 6 13:55:32 2016 -0800

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  48 +-
 .../beam/runners/core/PaneInfoTracker.java      |   4 +
 .../runners/core/ReduceFnContextFactory.java    |   9 +-
 .../beam/runners/core/ReduceFnRunner.java       | 493 ++++++++++++-------
 .../apache/beam/runners/core/WatermarkHold.java |   5 +
 .../triggers/TriggerStateMachineRunner.java     |  14 +-
 .../beam/runners/core/ReduceFnTester.java       |  77 ++-
 .../triggers/TriggerStateMachineTester.java     |  17 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   5 +-
 10 files changed, 440 insertions(+), 243 deletions(-)
----------------------------------------------------------------------