You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/06 04:10:45 UTC
[2/3] incubator-beam git commit: Check for closed windows
post-merging rather than pre-merging. Make sure we garbage collect NEW
windows which end up being for closed windows. Add unit tests to confirm.
Check for closed windows post-merging rather than pre-merging.
Make sure we garbage collect NEW windows which end up being for closed windows.
Add unit tests to confirm.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c22a05c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c22a05c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c22a05c
Branch: refs/heads/master
Commit: 2c22a05c58b7f1ac9cbb86bbd9f3f4f9fda9bc39
Parents: 99c0fa7
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 31 11:36:01 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 5 19:07:10 2016 -0700
----------------------------------------------------------------------
.../dataflow/sdk/util/ActiveWindowSet.java | 23 +++-
.../sdk/util/MergingActiveWindowSet.java | 133 +++++++++++--------
.../sdk/util/NonMergingActiveWindowSet.java | 11 +-
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 65 +++++----
.../sdk/util/MergingActiveWindowSetTest.java | 23 ++--
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 122 ++++++++++++++++-
.../cloud/dataflow/sdk/util/ReduceFnTester.java | 4 +
.../cloud/dataflow/sdk/util/TriggerTester.java | 2 +-
8 files changed, 281 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
index 9476254..1782ae3 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
@@ -20,6 +20,7 @@ package com.google.cloud.dataflow.sdk.util;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Set;
@@ -94,9 +95,10 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
}
/**
- * Remove EPHEMERAL windows since we only need to know about them while processing new elements.
+ * Remove EPHEMERAL windows and remaining NEW windows since we only need to know about them
+ * while processing new elements.
*/
- void removeEphemeralWindows();
+ void cleanupTemporaryWindows();
/**
* Save any state changes needed.
@@ -109,7 +111,7 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
* yet been seen.
*/
@Nullable
- W representative(W window);
+ W mergeResultWindow(W window);
/**
* Return (a view of) the set of currently ACTIVE windows.
@@ -122,16 +124,23 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
boolean isActive(W window);
/**
- * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
- * as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call
- * to {@link #merge}.
+ * Called when an incoming element indicates it is a member of {@code window}, but before we
+ * have started processing that element. If {@code window} is not already known to be ACTIVE,
+ * MERGED or EPHEMERAL then add it as NEW.
+ */
+ void ensureWindowExists(W window);
+
+ /**
+ * Called when a NEW or ACTIVE window is now known to be ACTIVE.
+ * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address window).
*/
- void addNew(W window);
+ void ensureWindowIsActive(W window);
/**
* If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
* as ACTIVE.
*/
+ @VisibleForTesting
void addActive(W window);
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index f4ffd99..8fee332 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -43,30 +44,30 @@ import javax.annotation.Nullable;
/**
* An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- *
+ * <p>
* <p>The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence
* classes of merged windows as a mapping from the merged "super-window" to a set of
* <i>state address</i> windows in which some state has been persisted. The mapping need not
* contain EPHEMERAL windows, because they are created and merged without any persistent state.
* Each window must be a state address window for at most one window, so the mapping is
* invertible.
- *
+ * <p>
* <p>The states of a non-expired window are treated as follows:
- *
+ * <p>
* <ul>
- * <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
- * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
- * address windows. If the window has not merged, this will necessarily be the singleton set
- * containing just itself, but it is not required that an ACTIVE window be amongst its
- * state address windows.</li>
- * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
- * other window - that window is retrieved via {@link #representative} (this reverse
- * association is implemented in O(1) time).</li>
- * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
- * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
- * to {@link #recordMerge} prior to any request for a {@link #representative}.</li>
+ * <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li>
+ * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state
+ * address windows. If the window has not merged, this will necessarily be the singleton set
+ * containing just itself, but it is not required that an ACTIVE window be amongst its
+ * state address windows.</li>
+ * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some
+ * other window - that window is retrieved via {@link #mergeResultWindow} (this reverse
+ * association is implemented in O(1) time).</li>
+ * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently;
+ * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call
+ * to {@link #recordMerge} prior to any request for a {@link #mergeResultWindow}.</li>
* </ul>
- *
+ * <p>
* <p>To illustrate why an ACTIVE window need not be amongst its own state address windows,
* consider two active windows W1 and W2 that are merged to form W12. Further writes may be
* applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging
@@ -74,6 +75,10 @@ import javax.annotation.Nullable;
*/
public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
private final WindowFn<Object, W> windowFn;
+
+ /**
+ * Map ACTIVE and NEW windows to their state address windows. Persisted.
+ */
private final Map<W, Set<W>> activeWindowToStateAddressWindows;
/**
@@ -82,10 +87,8 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
private final Map<W, Set<W>> activeWindowToEphemeralWindows;
/**
- * A map from window to the ACTIVE window it has been merged into.
- *
- * <p>Does not need to be persisted.
- *
+ * A map from window to the ACTIVE window it has been merged into. Does not need to be persisted.
+ * <p>
* <ul>
* <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
* <li>ACTIVE windows map to themselves.
@@ -98,7 +101,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
/**
* Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
- *
+ * <p>
* <p>Used to avoid writing to state if no changes have been made during the work unit.
*/
private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
@@ -124,7 +127,19 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
}
@Override
- public void removeEphemeralWindows() {
+ public void cleanupTemporaryWindows() {
+ // All NEW windows can be forgotten.
+ Iterator<Map.Entry<W, Set<W>>> iter =
+ activeWindowToStateAddressWindows.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<W, Set<W>> entry = iter.next();
+ if (entry.getValue().isEmpty()) {
+ windowToActiveWindow.remove(entry.getKey());
+ iter.remove();
+ }
+ }
+
+ // All EPHEMERAL windows can be forgotten.
for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
for (W ephemeral : entry.getValue()) {
windowToActiveWindow.remove(ephemeral);
@@ -160,7 +175,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
@Override
@Nullable
- public W representative(W window) {
+ public W mergeResultWindow(W window) {
return windowToActiveWindow.get(window);
}
@@ -175,13 +190,30 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
}
@Override
- public void addNew(W window) {
+ public void ensureWindowExists(W window) {
if (!windowToActiveWindow.containsKey(window)) {
+ Preconditions.checkState(!activeWindowToStateAddressWindows.containsKey(window));
activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
+ windowToActiveWindow.put(window, window);
}
}
@Override
+ public void ensureWindowIsActive(W window) {
+ Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+ Preconditions.checkState(stateAddressWindows != null,
+ "Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
+ window);
+ if (stateAddressWindows.isEmpty()) {
+ // Window was NEW, make it ACTIVE.
+ Preconditions.checkState(windowToActiveWindow.containsKey(window)
+ && windowToActiveWindow.get(window).equals(window));
+ stateAddressWindows.add(window);
+ }
+ }
+
+ @Override
+ @VisibleForTesting
public void addActive(W window) {
if (!windowToActiveWindow.containsKey(window)) {
Set<W> stateAddressWindows = new LinkedHashSet<>();
@@ -193,21 +225,17 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
@Override
public void remove(W window) {
- Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
- if (stateAddressWindows == null) {
- // Window is no longer active.
- return;
- }
- for (W stateAddressWindow : stateAddressWindows) {
- windowToActiveWindow.remove(stateAddressWindow);
+ Set<W> stateAddressWindows = activeWindowToStateAddressWindows.remove(window);
+ if (stateAddressWindows != null) {
+ for (W stateAddressWindow : stateAddressWindows) {
+ windowToActiveWindow.remove(stateAddressWindow);
+ }
}
- activeWindowToStateAddressWindows.remove(window);
- Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(window);
+ Set<W> ephemeralWindows = activeWindowToEphemeralWindows.remove(window);
if (ephemeralWindows != null) {
for (W ephemeralWindow : ephemeralWindows) {
windowToActiveWindow.remove(ephemeralWindow);
}
- activeWindowToEphemeralWindows.remove(window);
}
windowToActiveWindow.remove(window);
}
@@ -292,16 +320,6 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
// Actually do the merging and invoke the callbacks.
context.recordMerges();
-
- // Any remaining NEW windows should become implicitly ACTIVE.
- for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
- if (entry.getValue().isEmpty()) {
- // This window was NEW but since it survived merging must now become ACTIVE.
- W window = entry.getKey();
- entry.getValue().add(window);
- windowToActiveWindow.put(window, window);
- }
- }
}
/**
@@ -435,12 +453,15 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
W active = entry.getKey();
Preconditions.checkState(!entry.getValue().isEmpty(),
- "Unexpected empty state address window set for ACTIVE window %s", active);
+ "Unexpected empty state address window set for ACTIVE window %s",
+ active);
for (W stateAddressWindow : entry.getValue()) {
Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
- "%s is in more than one state address window set", stateAddressWindow);
+ "%s is in more than one state address window set",
+ stateAddressWindow);
Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
- "%s should have %s as its ACTIVE window", stateAddressWindow, active);
+ "%s should have %s as its ACTIVE window", stateAddressWindow,
+ active);
}
}
for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
@@ -451,14 +472,16 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
!entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active);
for (W ephemeralWindow : entry.getValue()) {
Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
- "%s is EPHEMERAL/state address of more than one ACTIVE window", ephemeralWindow);
+ "%s is EPHEMERAL/state address of more than one ACTIVE window",
+ ephemeralWindow);
Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
"%s should have %s as its ACTIVE window", ephemeralWindow, active);
}
}
for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
- "%s should be ACTIVE since representative for %s", entry.getValue(), entry.getKey());
+ "%s should be ACTIVE since mergeResultWindow for %s",
+ entry.getValue(), entry.getKey());
}
}
@@ -521,7 +544,9 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
}
}
- /** Return a deep copy of {@code multimap}. */
+ /**
+ * Return a deep copy of {@code multimap}.
+ */
private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
Map<W, Set<W>> newMultimap = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
@@ -530,15 +555,19 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
return newMultimap;
}
- /** Return inversion of {@code multimap}, which must be invertible. */
+ /**
+ * Return inversion of {@code multimap}, which must be invertible.
+ */
private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
Map<W, W> result = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
W active = entry.getKey();
for (W target : entry.getValue()) {
W previous = result.put(target, active);
- Preconditions.checkState(previous == null,
- "Window %s has both %s and %s as representatives", target, previous, active);
+ Preconditions.checkState(
+ previous == null,
+ "Multimap is not invertible: Window %s has both %s and %s as representatives",
+ target, previous, active);
}
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
index 260193e..52d8275 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/NonMergingActiveWindowSet.java
@@ -19,6 +19,7 @@ package com.google.cloud.dataflow.sdk.util;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
@@ -32,13 +33,13 @@ import java.util.Set;
*/
public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
@Override
- public void removeEphemeralWindows() {}
+ public void cleanupTemporaryWindows() {}
@Override
public void persist() {}
@Override
- public W representative(W window) {
+ public W mergeResultWindow(W window) {
// Always represented by itself.
return window;
}
@@ -56,9 +57,13 @@ public class NonMergingActiveWindowSet<W extends BoundedWindow> implements Activ
}
@Override
- public void addNew(W window) {}
+ public void ensureWindowExists(W window) {}
@Override
+ public void ensureWindowIsActive(W window) {}
+
+ @Override
+ @VisibleForTesting
public void addActive(W window) {}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2415dab..f4e9d11 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -248,6 +248,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
}
+ @VisibleForTesting
+ boolean hasNoActiveWindows() {
+ return activeWindows.getActiveWindows().isEmpty();
+ }
+
/**
* Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
* triggers, and window merging.
@@ -295,7 +300,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
- activeWindows.removeEphemeralWindows();
+ // Any windows which are still NEW must have come in on a new element which was then discarded
+ // due to the window's trigger being closed. We can thus delete them.
+ // Any windows which are EPHEMERAL must have come in on a new element but been merged away
+ // into some other ACTIVE window. We can thus also delete them.
+ activeWindows.cleanupTemporaryWindows();
}
public void persist() {
@@ -318,14 +327,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- if (triggerRunner.isClosed(directContext.state())) {
- // This window has already been closed.
- // We will update the counter for this in the corresponding processElement call.
- continue;
- }
-
+ // For backwards compat with pre 1.4 only.
if (activeWindows.isActive(window)) {
Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
if (stateAddressWindows.size() > 1) {
@@ -339,8 +341,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
}
- // Add this window as NEW if we've not yet seen it.
- activeWindows.addNew(window);
+ // Add this window as NEW if it is not currently ACTIVE or MERGED.
+ // If we had already seen this window and closed its trigger, then the
+ // window will not be ACTIVE or MERGED. It will then be added as NEW here,
+ // and fall into the merging logic as usual.
+ activeWindows.ensureWindowExists(window);
}
}
@@ -392,7 +397,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// Merge non-empty pane state.
nonEmptyPanes.onMerge(renamedMergeContext.state());
- // Have the trigger merge state as needed
+ // Have the trigger merge state as needed.
triggerRunner.onMerge(
directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
@@ -441,19 +446,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
- if (triggerRunner.isClosed(directContext.state())) {
- // This window has already been closed.
- droppedDueToClosedWindow.addValue(1L);
- WindowTracing.debug(
- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
- + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
- value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- continue;
- }
-
- W active = activeWindows.representative(window);
- Preconditions.checkState(active != null, "Window %s has no representative", window);
+ W active = activeWindows.mergeResultWindow(window);
+ Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window);
windows.add(active);
}
@@ -464,13 +458,28 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
triggerRunner.prefetchForValue(window, directContext.state());
}
- // Process the element for each (representative, not closed) window it belongs to.
+ // Process the element for each (mergeResultWindow, not closed) window it belongs to.
+ List<W> triggerableWindows = new ArrayList<>(windows.size());
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+
+ if (triggerRunner.isClosed(directContext.state())) {
+ // This window has already been closed.
+ droppedDueToClosedWindow.addValue(1L);
+ WindowTracing.debug(
+ "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
+ value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ continue;
+ }
+
+ triggerableWindows.add(window);
+ activeWindows.ensureWindowIsActive(window);
+
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
-
nonEmptyPanes.recordContent(renamedContext.state());
// Make sure we've scheduled the end-of-window or garbage collection timer for this window.
@@ -511,7 +520,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// (We don't actually assert this since it is too slow.)
}
- return windows;
+ return triggerableWindows;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
index 802ceae..32f3c1b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSetTest.java
@@ -83,20 +83,23 @@ public class MergingActiveWindowSetTest {
};
for (IntervalWindow window : windowFn.assignWindows(context)) {
- set.addNew(window);
+ set.ensureWindowExists(window);
}
}
private void merge(ActiveWindowSet.MergeCallback<IntervalWindow> callback) throws Exception {
System.out.println("MERGE");
set.merge(callback);
+ for (IntervalWindow window : set.getActiveWindows()) {
+ set.ensureWindowIsActive(window);
+ }
set.checkInvariants();
System.out.println(set);
}
private void pruneAndPersist() {
System.out.println("PRUNE");
- set.removeEphemeralWindows();
+ set.cleanupTemporaryWindows();
set.checkInvariants();
System.out.println(set);
set.persist();
@@ -127,10 +130,10 @@ public class MergingActiveWindowSetTest {
verify(callback).onMerge(ImmutableList.of(window(1, 10), window(2, 10)),
ImmutableList.<IntervalWindow>of(), window(1, 11));
assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveWindows());
- assertEquals(window(1, 11), set.representative(window(1, 10)));
- assertEquals(window(1, 11), set.representative(window(2, 10)));
- assertEquals(window(1, 11), set.representative(window(1, 11)));
- assertEquals(window(15, 10), set.representative(window(15, 10)));
+ assertEquals(window(1, 11), set.mergeResultWindow(window(1, 10)));
+ assertEquals(window(1, 11), set.mergeResultWindow(window(2, 10)));
+ assertEquals(window(1, 11), set.mergeResultWindow(window(1, 11)));
+ assertEquals(window(15, 10), set.mergeResultWindow(window(15, 10)));
assertEquals(
ImmutableSet.<IntervalWindow>of(window(1, 11)), set.readStateAddresses(window(1, 11)));
assertEquals(
@@ -146,7 +149,7 @@ public class MergingActiveWindowSetTest {
verify(callback).onMerge(ImmutableList.of(window(1, 11), window(3, 10)),
ImmutableList.<IntervalWindow>of(window(1, 11)), window(1, 12));
assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveWindows());
- assertEquals(window(1, 12), set.representative(window(3, 10)));
+ assertEquals(window(1, 12), set.mergeResultWindow(window(3, 10)));
// NEW 8+10
// =>
@@ -159,9 +162,9 @@ public class MergingActiveWindowSetTest {
verify(callback).onMerge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)),
ImmutableList.<IntervalWindow>of(window(1, 12), window(15, 10)), window(1, 24));
assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveWindows());
- assertEquals(window(1, 24), set.representative(window(1, 12)));
- assertEquals(window(1, 24), set.representative(window(1, 11)));
- assertEquals(window(1, 24), set.representative(window(15, 10)));
+ assertEquals(window(1, 24), set.mergeResultWindow(window(1, 12)));
+ assertEquals(window(1, 24), set.mergeResultWindow(window(1, 11)));
+ assertEquals(window(1, 24), set.mergeResultWindow(window(15, 10)));
// NEW 9+10
// =>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index b58e360..b2d246e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -725,8 +725,40 @@ public class ReduceFnRunnerTest {
}
/**
+ * Ensure a closed trigger has its state recorded in the merge result window.
+ */
+ @Test
+ public void testMergingWithCloseTrigger() throws Exception {
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ // Create a new merged session window.
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)));
+
+ // Force the trigger to be closed for the merged window.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTrigger);
+ tester.advanceInputWatermark(new Instant(13));
+
+ // Trigger is now closed.
+ assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+
+ // Revisit the same session window.
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)));
+
+ // Trigger is still closed.
+ assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+ }
+
+ /**
* If a later event tries to reuse an earlier session window which has been closed, we
- * should reject that element and not fail due to the window no longer having a representative.
+ * should reject that element and not fail due to the window no longer being active.
*/
@Test
public void testMergingWithReusedWindow() throws Exception {
@@ -747,6 +779,9 @@ public class ReduceFnRunnerTest {
// Should be discarded with 'window closed'.
tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
+ // And nothing should be left in the active window state.
+ assertTrue(tester.hasNoActiveWindows());
+
// Now the garbage collection timer will fire, finding the trigger already closed.
tester.advanceInputWatermark(new Instant(100));
@@ -763,6 +798,91 @@ public class ReduceFnRunnerTest {
}
/**
+ * When a merged window's trigger is closed we record that state using the merged window rather
+ * than the original windows.
+ */
+ @Test
+ public void testMergingWithClosedRepresentative() throws Exception {
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ // 2 elements into merged session window.
+ // Close the trigger, but the garbage collection timer is still pending.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTrigger);
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21.
+ TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28.
+
+ // More elements into the same merged session window.
+ // It has not yet been gced.
+ // Should be discarded with 'window closed'.
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21.
+ TimestampedValue.of(2, new Instant(2)), // in [2, 12), gc at 22.
+ TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28.
+
+ // Now the garbage collection timer will fire, finding the trigger already closed.
+ tester.advanceInputWatermark(new Instant(100));
+
+ List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+
+ assertThat(output.size(), equalTo(1));
+ assertThat(output.get(0),
+ isSingleWindowedValue(containsInAnyOrder(1, 8),
+ 1, // timestamp
+ 1, // window start
+ 18)); // window end
+ assertThat(
+ output.get(0).getPane(),
+ equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+ }
+
+ /**
+ * If an element for a closed session window ends up being merged into other still-open
+ * session windows, the resulting session window is not 'poisoned'.
+ */
+ @Test
+ public void testMergingWithClosedDoesNotPoison() throws Exception {
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ // 1 element, force its trigger to close.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTrigger);
+ tester.injectElements(TimestampedValue.of(2, new Instant(2)));
+
+ // 3 elements, one already closed.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)),
+ TimestampedValue.of(3, new Instant(3)));
+
+ tester.advanceInputWatermark(new Instant(100));
+
+ List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+ assertThat(output.size(), equalTo(2));
+ assertThat(output.get(0),
+ isSingleWindowedValue(containsInAnyOrder(2),
+ 2, // timestamp
+ 2, // window start
+ 12)); // window end
+ assertThat(
+ output.get(0).getPane(),
+ equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+ assertThat(output.get(1),
+ isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
+ 1, // timestamp
+ 1, // window start
+ 13)); // window end
+ assertThat(
+ output.get(1).getPane(),
+ equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+ }
+
+ /**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
index f947ba3..f10341b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
@@ -236,6 +236,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return createRunner().isFinished(window);
}
+ public boolean hasNoActiveWindows() {
+ return createRunner().hasNoActiveWindows();
+ }
+
@SafeVarargs
public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c22a05c/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
index fdfe1e6..75708c4 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
@@ -263,7 +263,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
// SDK is responsible for type safety
@SuppressWarnings("unchecked")
- W window = activeWindows.representative((W) untypedWindow);
+ W window = activeWindows.mergeResultWindow((W) untypedWindow);
Trigger<W>.OnElementContext context = contextFactory.createOnElementContext(window,
new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),