You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/01 16:49:25 UTC
flink git commit: [FLINK-4884] Eagerly Store MergingWindowSet in
State in WindowOperator
Repository: flink
Updated Branches:
refs/heads/master cfb3790fb -> ca68d2e07
[FLINK-4884] Eagerly Store MergingWindowSet in State in WindowOperator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca68d2e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca68d2e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca68d2e0
Branch: refs/heads/master
Commit: ca68d2e074eee66066377c3ebf69c0e15e9956d0
Parents: cfb3790
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 21 17:55:53 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 1 14:02:54 2016 +0100
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 2 +
.../operators/windowing/MergingWindowSet.java | 56 +++++++------
.../operators/windowing/WindowOperator.java | 77 ++++++------------
.../windowing/MergingWindowSetTest.java | 83 +++++++++++++++++++-
4 files changed, 140 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 2f4dbde..f9b409e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -168,6 +168,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
}
+ mergingWindows.persist();
} else {
for (W window : elementWindows) {
@@ -308,6 +309,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
+ mergingWindows.persist();
}
context.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 4e19c31..06cacad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -58,39 +58,51 @@ public class MergingWindowSet<W extends Window> {
* we are incrementally merging windows starting from some window we keep that starting
* window as the state window to prevent costly state juggling.
*/
- private final Map<W, W> windows;
+ private final Map<W, W> mapping;
/**
- * Our window assigner.
+ * Mapping when we created the {@code MergingWindowSet}. We use this to decide whether
+ * we need to persist any changes to state.
*/
- private final MergingWindowAssigner<?, W> windowAssigner;
+ private final Map<W, W> initialMapping;
+
+ private final ListState<Tuple2<W, W>> state;
/**
- * Creates a new {@link MergingWindowSet} that uses the given {@link MergingWindowAssigner}.
+ * Our window assigner.
*/
- public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner) {
- this.windowAssigner = windowAssigner;
- windows = new HashMap<>();
- }
+ private final MergingWindowAssigner<?, W> windowAssigner;
/**
* Restores a {@link MergingWindowSet} from the given state.
*/
public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner, ListState<Tuple2<W, W>> state) throws Exception {
this.windowAssigner = windowAssigner;
- windows = new HashMap<>();
+ mapping = new HashMap<>();
Iterable<Tuple2<W, W>> windowState = state.get();
if (windowState != null) {
for (Tuple2<W, W> window: windowState) {
- windows.put(window.f0, window.f1);
+ mapping.put(window.f0, window.f1);
}
}
+
+ this.state = state;
+
+ initialMapping = new HashMap<>();
+ initialMapping.putAll(mapping);
}
- public void persist(ListState<Tuple2<W, W>> state) throws Exception {
- for (Map.Entry<W, W> window: windows.entrySet()) {
- state.add(new Tuple2<>(window.getKey(), window.getValue()));
+ /**
+ * Persist the updated mapping to the given state if the mapping changed since
+ * initialization.
+ */
+ public void persist() throws Exception {
+ if (!mapping.equals(initialMapping)) {
+ state.clear();
+ for (Map.Entry<W, W> window : mapping.entrySet()) {
+ state.add(new Tuple2<>(window.getKey(), window.getValue()));
+ }
}
}
@@ -103,7 +115,7 @@ public class MergingWindowSet<W extends Window> {
* @param window The window for which to get the state window.
*/
public W getStateWindow(W window) {
- return windows.get(window);
+ return mapping.get(window);
}
/**
@@ -112,7 +124,7 @@ public class MergingWindowSet<W extends Window> {
* @param window The {@code Window} to remove.
*/
public void retireWindow(W window) {
- W removed = this.windows.remove(window);
+ W removed = this.mapping.remove(window);
if (removed == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
@@ -143,7 +155,7 @@ public class MergingWindowSet<W extends Window> {
List<W> windows = new ArrayList<>();
- windows.addAll(this.windows.keySet());
+ windows.addAll(this.mapping.keySet());
windows.add(newWindow);
final Map<W, Collection<W>> mergeResults = new HashMap<>();
@@ -173,18 +185,18 @@ public class MergingWindowSet<W extends Window> {
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
- W mergedStateWindow = this.windows.get(mergedWindows.iterator().next());
+ W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
- W res = this.windows.remove(mergedWindow);
+ W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
- this.windows.put(mergeResult, mergedStateWindow);
+ this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
mergedStateWindows.remove(mergedStateWindow);
@@ -195,14 +207,14 @@ public class MergingWindowSet<W extends Window> {
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
- this.windows.get(mergeResult),
+ this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
- this.windows.put(resultWindow, resultWindow);
+ this.mapping.put(resultWindow, resultWindow);
}
return resultWindow;
@@ -229,7 +241,7 @@ public class MergingWindowSet<W extends Window> {
@Override
public String toString() {
return "MergingWindowSet{" +
- "windows=" + windows +
+ "windows=" + mapping +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b319828..c465767 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -36,8 +36,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -57,8 +55,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,6 +100,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
+ protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
+
/**
* For serializing the key in checkpoints.
*/
@@ -143,8 +141,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
private transient InternalTimerService<W> internalTimerService;
- protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
-
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
*/
@@ -173,6 +169,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.trigger = checkNotNull(trigger);
this.allowedLateness = allowedLateness;
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+ mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ } else {
+ mergingWindowsDescriptor = null;
+ }
+
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -193,10 +197,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
return internalTimerService.currentProcessingTime();
}
};
-
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindowsByKey = new HashMap<>();
- }
}
@Override
@@ -205,7 +205,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
timestampedCollector = null;
context = null;
windowAssignerContext = null;
- mergingWindowsByKey = null;
}
@Override
@@ -214,7 +213,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
timestampedCollector = null;
context = null;
windowAssignerContext = null;
- mergingWindowsByKey = null;
}
@Override
@@ -273,8 +271,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
- AppendingState<IN, ACC> windowState = getPartitionedState(
- stateWindow, windowSerializer, windowStateDescriptor);
+ AppendingState<IN, ACC> windowState =
+ getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
@@ -299,6 +297,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
registerCleanupTimer(actualWindow);
}
}
+
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
} else {
for (W window: elementWindows) {
@@ -307,8 +308,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
continue;
}
- AppendingState<IN, ACC> windowState = getPartitionedState(
- window, windowSerializer, windowStateDescriptor);
+ AppendingState<IN, ACC> windowState =
+ getPartitionedState(window, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
@@ -423,6 +424,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
+ mergingWindows.persist();
}
context.clear();
}
@@ -440,22 +442,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/**
* Retrieves the {@link MergingWindowSet} for the currently active key.
* The caller must ensure that the correct key is set in the state backend.
+ *
+ * <p>The caller must also ensure to properly persist changes to state using
+ * {@link MergingWindowSet#persist()}.
*/
- @SuppressWarnings("unchecked")
protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
- MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getKeyedStateBackend().getCurrentKey());
- if (mergingWindows == null) {
- // try to retrieve from state
-
- TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
- ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
- ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
+ ListState<Tuple2<W, W>> mergeState =
+ getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor);
- mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
- mergeState.clear();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ MergingWindowSet<W> mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState);
- mergingWindowsByKey.put((K) getKeyedStateBackend().getCurrentKey(), mergingWindows);
- }
return mergingWindows;
}
@@ -718,30 +715,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
// ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
- if (mergingWindowsByKey != null) {
- TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
- ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
- for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
- setCurrentKey(key.getKey());
- ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
- mergeState.clear();
- key.getValue().persist(mergeState);
- }
- }
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
- }
-
- // ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index e2cb6c8..7c1fa93 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -17,10 +17,14 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.junit.Test;
+import org.mockito.Matchers;
import java.util.Collection;
@@ -31,6 +35,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
/**
* Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations
@@ -40,7 +46,11 @@ public class MergingWindowSetTest {
@Test
public void testIncrementalMerging() throws Exception {
- MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet =
+ new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
TestingMergeFunction mergeFunction = new TestingMergeFunction();
@@ -140,7 +150,10 @@ public class MergingWindowSetTest {
@Test
public void testLateMerging() throws Exception {
- MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
TestingMergeFunction mergeFunction = new TestingMergeFunction();
@@ -210,7 +223,10 @@ public class MergingWindowSetTest {
*/
@Test
public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
- MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
TestingMergeFunction mergeFunction = new TestingMergeFunction();
@@ -234,7 +250,10 @@ public class MergingWindowSetTest {
*/
@Test
public void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
- MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
TestingMergeFunction mergeFunction = new TestingMergeFunction();
@@ -265,9 +284,65 @@ public class MergingWindowSetTest {
containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)),
containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))));
assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+ }
+ @Test
+ public void testRestoreFromState() throws Exception {
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+ when(mockState.get()).thenReturn(Lists.newArrayList(
+ new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
+ new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
+ ));
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+ assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
+ assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
}
+ @Test
+ public void testPersist() throws Exception {
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+ TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+ windowSet.addWindow(new TimeWindow(1, 2), mergeFunction);
+ windowSet.addWindow(new TimeWindow(17, 42), mergeFunction);
+
+ assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+ assertEquals(new TimeWindow(17, 42), windowSet.getStateWindow(new TimeWindow(17, 42)));
+
+ windowSet.persist();
+
+ verify(mockState).add(eq(new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(1, 2))));
+ verify(mockState).add(eq(new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(17, 42))));
+
+ verify(mockState, times(2)).add(Matchers.<Tuple2<TimeWindow, TimeWindow>>anyObject());
+ }
+
+ @Test
+ public void testPersistOnlyIfHaveUpdates() throws Exception {
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+ when(mockState.get()).thenReturn(Lists.newArrayList(
+ new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
+ new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
+ ));
+
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+ assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
+ assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
+
+ windowSet.persist();
+
+ verify(mockState, times(0)).add(Matchers.<Tuple2<TimeWindow, TimeWindow>>anyObject());
+
+ }
private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
private TimeWindow target = null;