You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/06 04:10:45 UTC

[2/3] incubator-beam git commit: Check for closed windows post-merging rather than pre-merging. Make sure we garbage collect NEW windows which end up being for closed windows. Add unit tests to confirm.

Check for closed windows post-merging rather than pre-merging.
Make sure we garbage collect NEW windows which end up being for closed windows.
Add unit tests to confirm.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c22a05c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c22a05c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c22a05c

Branch: refs/heads/master
Commit: 2c22a05c58b7f1ac9cbb86bbd9f3f4f9fda9bc39
Parents: 99c0fa7
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 31 11:36:01 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 5 19:07:10 2016 -0700

----------------------------------------------------------------------
 .../dataflow/sdk/util/ActiveWindowSet.java      |  23 +++-
 .../sdk/util/MergingActiveWindowSet.java        | 133 +++++++++++--------
 .../sdk/util/NonMergingActiveWindowSet.java     |  11 +-
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java |  65 +++++----
 .../sdk/util/MergingActiveWindowSetTest.java    |  23 ++--
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   | 122 ++++++++++++++++-
 .../cloud/dataflow/sdk/util/ReduceFnTester.java |   4 +
 .../cloud/dataflow/sdk/util/TriggerTester.java  |   2 +-
 8 files changed, 281 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
index 9476254..1782ae3 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
@@ -20,6 +20,7 @@ package com.google.cloud.dataflow.sdk.util;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Set;
 
@@ -94,9 +95,10 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
   }
 
   /**
-   * Remove EPHEMERAL windows since we only need to know about them while processing new elements.
+   * Remove EPHEMERAL windows and remaining NEW windows since we only need to know about them
+   * while processing new elements.
    */
-  void removeEphemeralWindows();
+  void cleanupTemporaryWindows();
 
   /**
    * Save any state changes needed.
@@ -109,7 +111,7 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
    * yet been seen.
    */
   @Nullable
-  W representative(W window);
+  W mergeResultWindow(W window);
 
   /**
    * Return (a view of) the set of currently ACTIVE windows.
@@ -122,16 +124,23 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
   boolean isActive(W window);
 
   /**
-   * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
-   * as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call
-   * to {@link #merge}.
+   * Called when an incoming element indicates it is a member of {@code window}, but before we
+   * have started processing that element. If {@code window} is not already known to be ACTIVE,
+   * MERGED or EPHEMERAL then add it as NEW.
+   */
+  void ensureWindowExists(W window);
+
+  /**
+   * Called when a NEW or ACTIVE window is now known to be ACTIVE.
+   * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address window).
    */
-  void addNew(W window);
+  void ensureWindowIsActive(W window);
 
   /**
    * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
    * as ACTIVE.
    */
+  @VisibleForTesting
   void addActive(W window);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index f4ffd99..8fee332 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,30 +44,30 @@ import javax.annotation.Nullable;
 
 /**
  * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- *
+ * <p>
  * <p>The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence
  * classes of merged windows as a mapping from the merged "super-window" to a set of
  * <i>state address</i> windows in which some state has been persisted. The mapping need not
  * contain EPHEMERAL windows, because they are created and merged without any persistent state.
  * Each window must be a state address window for at most one window, so the mapping is
  * invertible.
- *
+ * <p>
  * <p>The states of a non-expired window are treated as follows:
- *
+ * <p>
  * <ul>
- *   <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
- *   <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
- *       address windows. If the window has not merged, this will necessarily be the singleton set
- *       containing just itself, but it is not required that an ACTIVE window be amongst its
- *       state address windows.</li>
- *   <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
- *       other window - that window is retrieved via {@link #representative} (this reverse
- *       association is implemented in O(1) time).</li>
- *   <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
- *       an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
- *       to {@link #recordMerge} prior to any request for a {@link #representative}.</li>
+ * <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
+ * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
+ * address windows. If the window has not merged, this will necessarily be the singleton set
+ * containing just itself, but it is not required that an ACTIVE window be amongst its
+ * state address windows.</li>
+ * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
+ * other window - that window is retrieved via {@link #mergeResultWindow} (this reverse
+ * association is implemented in O(1) time).</li>
+ * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
+ * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
+ * to {@link #recordMerge} prior to any request for a {@link #mergeResultWindow}.</li>
  * </ul>
- *
+ * <p>
  * <p>To illustrate why an ACTIVE window need not be amongst its own state address windows,
  * consider two active windows W1 and W2 that are merged to form W12. Further writes may be
  * applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging
@@ -74,6 +75,10 @@ import javax.annotation.Nullable;
  */
 public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
   private final WindowFn<Object, W> windowFn;
+
+  /**
+   * Map ACTIVE and NEW windows to their state address windows. Persisted.
+   */
   private final Map<W, Set<W>> activeWindowToStateAddressWindows;
 
   /**
@@ -82,10 +87,8 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
   private final Map<W, Set<W>> activeWindowToEphemeralWindows;
 
   /**
-   * A map from window to the ACTIVE window it has been merged into.
-   *
-   * <p>Does not need to be persisted.
-   *
+   * A map from window to the ACTIVE window it has been merged into. Does not need to be persisted.
+   * <p>
    * <ul>
    * <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
    * <li>ACTIVE windows map to themselves.
@@ -98,7 +101,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
   /**
    * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
-   *
+   * <p>
    * <p>Used to avoid writing to state if no changes have been made during the work unit.
    */
   private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
@@ -124,7 +127,19 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
   }
 
   @Override
-  public void removeEphemeralWindows() {
+  public void cleanupTemporaryWindows() {
+    // All NEW windows can be forgotten.
+    Iterator<Map.Entry<W, Set<W>>> iter =
+        activeWindowToStateAddressWindows.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<W, Set<W>> entry = iter.next();
+      if (entry.getValue().isEmpty()) {
+        windowToActiveWindow.remove(entry.getKey());
+        iter.remove();
+      }
+    }
+
+    // All EPHEMERAL windows can be forgotten.
     for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
       for (W ephemeral : entry.getValue()) {
         windowToActiveWindow.remove(ephemeral);
@@ -160,7 +175,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
   @Override
   @Nullable
-  public W representative(W window) {
+  public W mergeResultWindow(W window) {
     return windowToActiveWindow.get(window);
   }
 
@@ -175,13 +190,30 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
   }
 
   @Override
-  public void addNew(W window) {
+  public void ensureWindowExists(W window) {
     if (!windowToActiveWindow.containsKey(window)) {
+      Preconditions.checkState(!activeWindowToStateAddressWindows.containsKey(window));
       activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
+      windowToActiveWindow.put(window, window);
     }
   }
 
   @Override
+  public void ensureWindowIsActive(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    Preconditions.checkState(stateAddressWindows != null,
+        "Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
+        window);
+    if (stateAddressWindows.isEmpty()) {
+      // Window was NEW, make it ACTIVE.
+      Preconditions.checkState(windowToActiveWindow.containsKey(window)
+                               && windowToActiveWindow.get(window).equals(window));
+      stateAddressWindows.add(window);
+    }
+  }
+
+  @Override
+  @VisibleForTesting
   public void addActive(W window) {
     if (!windowToActiveWindow.containsKey(window)) {
       Set<W> stateAddressWindows = new LinkedHashSet<>();
@@ -193,21 +225,17 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
   @Override
   public void remove(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    if (stateAddressWindows == null) {
-      // Window is no longer active.
-      return;
-    }
-    for (W stateAddressWindow : stateAddressWindows) {
-      windowToActiveWindow.remove(stateAddressWindow);
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.remove(window);
+    if (stateAddressWindows != null) {
+      for (W stateAddressWindow : stateAddressWindows) {
+        windowToActiveWindow.remove(stateAddressWindow);
+      }
     }
-    activeWindowToStateAddressWindows.remove(window);
-    Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(window);
+    Set<W> ephemeralWindows = activeWindowToEphemeralWindows.remove(window);
     if (ephemeralWindows != null) {
       for (W ephemeralWindow : ephemeralWindows) {
         windowToActiveWindow.remove(ephemeralWindow);
       }
-      activeWindowToEphemeralWindows.remove(window);
     }
     windowToActiveWindow.remove(window);
   }
@@ -292,16 +320,6 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
     // Actually do the merging and invoke the callbacks.
     context.recordMerges();
-
-    // Any remaining NEW windows should become implicitly ACTIVE.
-    for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
-      if (entry.getValue().isEmpty()) {
-        // This window was NEW but since it survived merging must now become ACTIVE.
-        W window = entry.getKey();
-        entry.getValue().add(window);
-        windowToActiveWindow.put(window, window);
-      }
-    }
   }
 
   /**
@@ -435,12 +453,15 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
     for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
       W active = entry.getKey();
       Preconditions.checkState(!entry.getValue().isEmpty(),
-          "Unexpected empty state address window set for ACTIVE window %s", active);
+          "Unexpected empty state address window set for ACTIVE window %s",
+          active);
       for (W stateAddressWindow : entry.getValue()) {
         Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
-            "%s is in more than one state address window set", stateAddressWindow);
+            "%s is in more than one state address window set",
+            stateAddressWindow);
         Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
-            "%s should have %s as its ACTIVE window", stateAddressWindow, active);
+            "%s should have %s as its ACTIVE window", stateAddressWindow,
+            active);
       }
     }
     for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
@@ -451,14 +472,16 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
           !entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active);
       for (W ephemeralWindow : entry.getValue()) {
         Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
-            "%s is EPHEMERAL/state address of more than one ACTIVE window", ephemeralWindow);
+            "%s is EPHEMERAL/state address of more than one ACTIVE window",
+            ephemeralWindow);
         Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
             "%s should have %s as its ACTIVE window", ephemeralWindow, active);
       }
     }
     for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
       Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
-          "%s should be ACTIVE since representative for %s", entry.getValue(), entry.getKey());
+          "%s should be ACTIVE since mergeResultWindow for %s",
+          entry.getValue(), entry.getKey());
     }
   }
 
@@ -521,7 +544,9 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
     }
   }
 
-  /** Return a deep copy of {@code multimap}. */
+  /**
+   * Return a deep copy of {@code multimap}.
+   */
   private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
     Map<W, Set<W>> newMultimap = new HashMap<>();
     for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
@@ -530,15 +555,19 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
     return newMultimap;
   }
 
-  /** Return inversion of {@code multimap}, which must be invertible. */
+  /**
+   * Return inversion of {@code multimap}, which must be invertible.
+   */
   private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
     Map<W, W> result = new HashMap<>();
     for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
       W active = entry.getKey();
       for (W target : entry.getValue()) {
         W previous = result.put(target, active);
-        Preconditions.checkState(previous == null,
-            "Window %s has both %s and %s as representatives", target, previous, active);
+        Preconditions.checkState(
+            previous == null,
+            "Multimap is not invertible: Window %s has both %s and %s as representatives",
+            target, previous, active);
       }
     }
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
index 260193e..52d8275 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
@@ -19,6 +19,7 @@ package com.google.cloud.dataflow.sdk.util;
 
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
 import java.util.Collection;
@@ -32,13 +33,13 @@ import java.util.Set;
  */
 public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
   @Override
-  public void removeEphemeralWindows() {}
+  public void cleanupTemporaryWindows() {}
 
   @Override
   public void persist() {}
 
   @Override
-  public W representative(W window) {
+  public W mergeResultWindow(W window) {
     // Always represented by itself.
     return window;
   }
@@ -56,9 +57,13 @@ public class NonMergingActiveWindowSet<W extends BoundedWindow> implements Activ
   }
 
   @Override
-  public void addNew(W window) {}
+  public void ensureWindowExists(W window) {}
 
   @Override
+  public void ensureWindowIsActive(W window) {}
+
+  @Override
+  @VisibleForTesting
   public void addActive(W window) {}
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2415dab..f4e9d11 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -248,6 +248,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
   }
 
+  @VisibleForTesting
+  boolean hasNoActiveWindows() {
+    return activeWindows.getActiveWindows().isEmpty();
+  }
+
   /**
    * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
    * triggers, and window merging.
@@ -295,7 +300,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     // We're all done with merging and emitting elements so can compress the activeWindow state.
-    activeWindows.removeEphemeralWindows();
+    // Any windows which are still NEW must have come in on a new element which was then discarded
+    // due to the window's trigger being closed. We can thus delete them.
+    // Any windows which are EPHEMERAL must have come in on a new element but been merged away
+    // into some other ACTIVE window. We can thus also delete them.
+    activeWindows.cleanupTemporaryWindows();
   }
 
   public void persist() {
@@ -318,14 +327,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         @SuppressWarnings("unchecked")
         W window = (W) untypedWindow;
 
-        ReduceFn<K, InputT, OutputT, W>.Context directContext =
-            contextFactory.base(window, StateStyle.DIRECT);
-        if (triggerRunner.isClosed(directContext.state())) {
-          // This window has already been closed.
-          // We will update the counter for this in the corresponding processElement call.
-          continue;
-        }
-
+        // For backwards compat with pre 1.4 only.
         if (activeWindows.isActive(window)) {
           Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
           if (stateAddressWindows.size() > 1) {
@@ -339,8 +341,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           }
         }
 
-        // Add this window as NEW if we've not yet seen it.
-        activeWindows.addNew(window);
+        // Add this window as NEW if it is not currently ACTIVE or MERGED.
+        // If we had already seen this window and closed its trigger, then the
+        // window will not be ACTIVE or MERGED. It will then be added as NEW here,
+        // and fall into the merging logic as usual.
+        activeWindows.ensureWindowExists(window);
       }
     }
 
@@ -392,7 +397,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       // Merge non-empty pane state.
       nonEmptyPanes.onMerge(renamedMergeContext.state());
 
-      // Have the trigger merge state as needed
+      // Have the trigger merge state as needed.
       triggerRunner.onMerge(
           directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
 
@@ -441,19 +446,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
       ReduceFn<K, InputT, OutputT, W>.Context directContext =
           contextFactory.base(window, StateStyle.DIRECT);
-      if (triggerRunner.isClosed(directContext.state())) {
-        // This window has already been closed.
-        droppedDueToClosedWindow.addValue(1L);
-        WindowTracing.debug(
-            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
-            + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
-            value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
-            timerInternals.currentOutputWatermarkTime());
-        continue;
-      }
-
-      W active = activeWindows.representative(window);
-      Preconditions.checkState(active != null, "Window %s has no representative", window);
+      W active = activeWindows.mergeResultWindow(window);
+      Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window);
       windows.add(active);
     }
 
@@ -464,13 +458,28 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       triggerRunner.prefetchForValue(window, directContext.state());
     }
 
-    // Process the element for each (representative, not closed) window it belongs to.
+    // Process the element for each (mergeResultWindow, not closed) window it belongs to.
+    List<W> triggerableWindows = new ArrayList<>(windows.size());
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+
+      if (triggerRunner.isClosed(directContext.state())) {
+        // This window has already been closed.
+        droppedDueToClosedWindow.addValue(1L);
+        WindowTracing.debug(
+            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+            + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
+            value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        continue;
+      }
+
+      triggerableWindows.add(window);
+      activeWindows.ensureWindowIsActive(window);
+
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
-
       nonEmptyPanes.recordContent(renamedContext.state());
 
       // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
@@ -511,7 +520,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       // (We don't actually assert this since it is too slow.)
     }
 
-    return windows;
+    return triggerableWindows;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
index 802ceae..32f3c1b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
@@ -83,20 +83,23 @@ public class MergingActiveWindowSetTest {
     };
 
     for (IntervalWindow window : windowFn.assignWindows(context)) {
-      set.addNew(window);
+      set.ensureWindowExists(window);
     }
   }
 
   private void merge(ActiveWindowSet.MergeCallback<IntervalWindow> callback) throws Exception {
     System.out.println("MERGE");
     set.merge(callback);
+    for (IntervalWindow window : set.getActiveWindows()) {
+      set.ensureWindowIsActive(window);
+    }
     set.checkInvariants();
     System.out.println(set);
   }
 
   private void pruneAndPersist() {
     System.out.println("PRUNE");
-    set.removeEphemeralWindows();
+    set.cleanupTemporaryWindows();
     set.checkInvariants();
     System.out.println(set);
     set.persist();
@@ -127,10 +130,10 @@ public class MergingActiveWindowSetTest {
     verify(callback).onMerge(ImmutableList.of(window(1, 10), window(2, 10)),
         ImmutableList.<IntervalWindow>of(), window(1, 11));
     assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveWindows());
-    assertEquals(window(1, 11), set.representative(window(1, 10)));
-    assertEquals(window(1, 11), set.representative(window(2, 10)));
-    assertEquals(window(1, 11), set.representative(window(1, 11)));
-    assertEquals(window(15, 10), set.representative(window(15, 10)));
+    assertEquals(window(1, 11), set.mergeResultWindow(window(1, 10)));
+    assertEquals(window(1, 11), set.mergeResultWindow(window(2, 10)));
+    assertEquals(window(1, 11), set.mergeResultWindow(window(1, 11)));
+    assertEquals(window(15, 10), set.mergeResultWindow(window(15, 10)));
     assertEquals(
         ImmutableSet.<IntervalWindow>of(window(1, 11)), set.readStateAddresses(window(1, 11)));
     assertEquals(
@@ -146,7 +149,7 @@ public class MergingActiveWindowSetTest {
     verify(callback).onMerge(ImmutableList.of(window(1, 11), window(3, 10)),
         ImmutableList.<IntervalWindow>of(window(1, 11)), window(1, 12));
     assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveWindows());
-    assertEquals(window(1, 12), set.representative(window(3, 10)));
+    assertEquals(window(1, 12), set.mergeResultWindow(window(3, 10)));
 
     // NEW 8+10
     // =>
@@ -159,9 +162,9 @@ public class MergingActiveWindowSetTest {
     verify(callback).onMerge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)),
         ImmutableList.<IntervalWindow>of(window(1, 12), window(15, 10)), window(1, 24));
     assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveWindows());
-    assertEquals(window(1, 24), set.representative(window(1, 12)));
-    assertEquals(window(1, 24), set.representative(window(1, 11)));
-    assertEquals(window(1, 24), set.representative(window(15, 10)));
+    assertEquals(window(1, 24), set.mergeResultWindow(window(1, 12)));
+    assertEquals(window(1, 24), set.mergeResultWindow(window(1, 11)));
+    assertEquals(window(1, 24), set.mergeResultWindow(window(15, 10)));
 
     // NEW 9+10
     // =>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index b58e360..b2d246e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -725,8 +725,40 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Ensure a closed trigger has its state recorded in the merge result window.
+   */
+  @Test
+  public void testMergingWithCloseTrigger() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Create a new merged session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+                          TimestampedValue.of(2, new Instant(2)));
+
+    // Force the trigger to be closed for the merged window.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(13));
+
+    // Trigger is now closed.
+    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    // Revisit the same session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+                          TimestampedValue.of(2, new Instant(2)));
+
+    // Trigger is still closed.
+    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+  }
+
+  /**
    * If a later event tries to reuse an earlier session window which has been closed, we
-   * should reject that element and not fail due to the window no longer having a representative.
+   * should reject that element and not fail due to the window no longer being active.
    */
   @Test
   public void testMergingWithReusedWindow() throws Exception {
@@ -747,6 +779,9 @@ public class ReduceFnRunnerTest {
     // Should be discarded with 'window closed'.
     tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
 
+    // And nothing should be left in the active window state.
+    assertTrue(tester.hasNoActiveWindows());
+
     // Now the garbage collection timer will fire, finding the trigger already closed.
     tester.advanceInputWatermark(new Instant(100));
 
@@ -763,6 +798,91 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * When a merged window's trigger is closed we record that state using the merged window rather
+   * than the original windows.
+   */
+  @Test
+  public void testMergingWithClosedRepresentative() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // 2 elements into merged session window.
+    // Close the trigger, but the garbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),       // in [1, 11), gc at 21.
+                          TimestampedValue.of(8, new Instant(8)));      // in [8, 18), gc at 28.
+
+    // More elements into the same merged session window.
+    // It has not yet been gced.
+    // Should be discarded with 'window closed'.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),      // in [1, 11), gc at 21.
+                          TimestampedValue.of(2, new Instant(2)),      // in [2, 12), gc at 22.
+                          TimestampedValue.of(8, new Instant(8)));     // in [8, 18), gc at 28.
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+               isSingleWindowedValue(containsInAnyOrder(1, 8),
+                                     1, // timestamp
+                                     1, // window start
+                                     18)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+  }
+
+  /**
+   * If an element for a closed session window ends up being merged into other still-open
+   * session windows, the resulting session window is not 'poisoned'.
+   */
+  @Test
+  public void testMergingWithClosedDoesNotPoison() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // 1 element, force its trigger to close.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.injectElements(TimestampedValue.of(2, new Instant(2)));
+
+    // 3 elements, one already closed.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)),
+        TimestampedValue.of(3, new Instant(3)));
+
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(2));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(2),
+            2, // timestamp
+            2, // window start
+            12)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+    assertThat(output.get(1),
+        isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
+            1, // timestamp
+            1, // window start
+            13)); // window end
+    assertThat(
+        output.get(1).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
    * Tests that when data is assigned to multiple windows but some of those windows have
    * had their triggers finish, then the data is dropped and counted accurately.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
index f947ba3..f10341b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
@@ -236,6 +236,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return createRunner().isFinished(window);
   }
 
+  public boolean hasNoActiveWindows() {
+    return createRunner().hasNoActiveWindows();
+  }
+
   @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
index fdfe1e6..75708c4 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
@@ -263,7 +263,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
       for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
         // SDK is responsible for type safety
         @SuppressWarnings("unchecked")
-        W window = activeWindows.representative((W) untypedWindow);
+        W window = activeWindows.mergeResultWindow((W) untypedWindow);
 
         Trigger<W>.OnElementContext context = contextFactory.createOnElementContext(window,
             new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),