You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/01 16:49:25 UTC

flink git commit: [FLINK-4884] Eagerly Store MergingWindowSet in State in WindowOperator

Repository: flink
Updated Branches:
  refs/heads/master cfb3790fb -> ca68d2e07


[FLINK-4884] Eagerly Store MergingWindowSet in State in WindowOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca68d2e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca68d2e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca68d2e0

Branch: refs/heads/master
Commit: ca68d2e074eee66066377c3ebf69c0e15e9956d0
Parents: cfb3790
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 21 17:55:53 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 1 14:02:54 2016 +0100

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       |  2 +
 .../operators/windowing/MergingWindowSet.java   | 56 +++++++------
 .../operators/windowing/WindowOperator.java     | 77 ++++++------------
 .../windowing/MergingWindowSetTest.java         | 83 +++++++++++++++++++-
 4 files changed, 140 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 2f4dbde..f9b409e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -168,6 +168,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				}
 			}
 
+			mergingWindows.persist();
 		} else {
 			for (W window : elementWindows) {
 
@@ -308,6 +309,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		windowState.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
+			mergingWindows.persist();
 		}
 		context.clear();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 4e19c31..06cacad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -58,39 +58,51 @@ public class MergingWindowSet<W extends Window> {
 	 * we are incrementally merging windows starting from some window we keep that starting
 	 * window as the state window to prevent costly state juggling.
 	 */
-	private final Map<W, W> windows;
+	private final Map<W, W> mapping;
 
 	/**
-	 * Our window assigner.
+	 * Mapping when we created the {@code MergingWindowSet}. We use this to decide whether
+	 * we need to persist any changes to state.
 	 */
-	private final MergingWindowAssigner<?, W> windowAssigner;
+	private final Map<W, W> initialMapping;
+
+	private final ListState<Tuple2<W, W>> state;
 
 	/**
-	 * Creates a new {@link MergingWindowSet} that uses the given {@link MergingWindowAssigner}.
+	 * Our window assigner.
 	 */
-	public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner) {
-		this.windowAssigner = windowAssigner;
-		windows = new HashMap<>();
-	}
+	private final MergingWindowAssigner<?, W> windowAssigner;
 
 	/**
 	 * Restores a {@link MergingWindowSet} from the given state.
 	 */
 	public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner, ListState<Tuple2<W, W>> state) throws Exception {
 		this.windowAssigner = windowAssigner;
-		windows = new HashMap<>();
+		mapping = new HashMap<>();
 
 		Iterable<Tuple2<W, W>> windowState = state.get();
 		if (windowState != null) {
 			for (Tuple2<W, W> window: windowState) {
-				windows.put(window.f0, window.f1);
+				mapping.put(window.f0, window.f1);
 			}
 		}
+
+		this.state = state;
+
+		initialMapping = new HashMap<>();
+		initialMapping.putAll(mapping);
 	}
 
-	public void persist(ListState<Tuple2<W, W>> state) throws Exception {
-		for (Map.Entry<W, W> window: windows.entrySet()) {
-			state.add(new Tuple2<>(window.getKey(), window.getValue()));
+	/**
+	 * Persist the updated mapping to the given state if the mapping changed since
+	 * initialization.
+	 */
+	public void persist() throws Exception {
+		if (!mapping.equals(initialMapping)) {
+			state.clear();
+			for (Map.Entry<W, W> window : mapping.entrySet()) {
+				state.add(new Tuple2<>(window.getKey(), window.getValue()));
+			}
 		}
 	}
 
@@ -103,7 +115,7 @@ public class MergingWindowSet<W extends Window> {
 	 * @param window The window for which to get the state window.
 	 */
 	public W getStateWindow(W window) {
-		return windows.get(window);
+		return mapping.get(window);
 	}
 
 	/**
@@ -112,7 +124,7 @@ public class MergingWindowSet<W extends Window> {
 	 * @param window The {@code Window} to remove.
 	 */
 	public void retireWindow(W window) {
-		W removed = this.windows.remove(window);
+		W removed = this.mapping.remove(window);
 		if (removed == null) {
 			throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
 		}
@@ -143,7 +155,7 @@ public class MergingWindowSet<W extends Window> {
 
 		List<W> windows = new ArrayList<>();
 
-		windows.addAll(this.windows.keySet());
+		windows.addAll(this.mapping.keySet());
 		windows.add(newWindow);
 
 		final Map<W, Collection<W>> mergeResults = new HashMap<>();
@@ -173,18 +185,18 @@ public class MergingWindowSet<W extends Window> {
 
 			// pick any of the merged windows and choose that window's state window
 			// as the state window for the merge result
-			W mergedStateWindow = this.windows.get(mergedWindows.iterator().next());
+			W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
 
 			// figure out the state windows that we are merging
 			List<W> mergedStateWindows = new ArrayList<>();
 			for (W mergedWindow: mergedWindows) {
-				W res = this.windows.remove(mergedWindow);
+				W res = this.mapping.remove(mergedWindow);
 				if (res != null) {
 					mergedStateWindows.add(res);
 				}
 			}
 
-			this.windows.put(mergeResult, mergedStateWindow);
+			this.mapping.put(mergeResult, mergedStateWindow);
 
 			// don't put the target state window into the merged windows
 			mergedStateWindows.remove(mergedStateWindow);
@@ -195,14 +207,14 @@ public class MergingWindowSet<W extends Window> {
 			if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
 				mergeFunction.merge(mergeResult,
 						mergedWindows,
-						this.windows.get(mergeResult),
+						this.mapping.get(mergeResult),
 						mergedStateWindows);
 			}
 		}
 
 		// the new window created a new, self-contained window without merging
 		if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
-			this.windows.put(resultWindow, resultWindow);
+			this.mapping.put(resultWindow, resultWindow);
 		}
 
 		return resultWindow;
@@ -229,7 +241,7 @@ public class MergingWindowSet<W extends Window> {
 	@Override
 	public String toString() {
 		return "MergingWindowSet{" +
-				"windows=" + windows +
+				"windows=" + mapping +
 				'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b319828..c465767 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -36,8 +36,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -57,8 +55,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,6 +100,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
 
+	protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
+
 	/**
 	 * For serializing the key in checkpoints.
 	 */
@@ -143,8 +141,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	private transient InternalTimerService<W> internalTimerService;
 
-	protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
-
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
@@ -173,6 +169,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.trigger = checkNotNull(trigger);
 		this.allowedLateness = allowedLateness;
 
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+			mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+		} else {
+			mergingWindowsDescriptor = null;
+		}
+
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
@@ -193,10 +197,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				return internalTimerService.currentProcessingTime();
 			}
 		};
-
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			mergingWindowsByKey = new HashMap<>();
-		}
 	}
 
 	@Override
@@ -205,7 +205,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		timestampedCollector = null;
 		context = null;
 		windowAssignerContext = null;
-		mergingWindowsByKey = null;
 	}
 
 	@Override
@@ -214,7 +213,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		timestampedCollector = null;
 		context = null;
 		windowAssignerContext = null;
-		mergingWindowsByKey = null;
 	}
 
 	@Override
@@ -273,8 +271,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
 				}
 
-				AppendingState<IN, ACC> windowState = getPartitionedState(
-					stateWindow, windowSerializer, windowStateDescriptor);
+				AppendingState<IN, ACC> windowState =
+						getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 				windowState.add(element.getValue());
 
 				context.key = key;
@@ -299,6 +297,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					registerCleanupTimer(actualWindow);
 				}
 			}
+
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		} else {
 			for (W window: elementWindows) {
 
@@ -307,8 +308,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					continue;
 				}
 
-				AppendingState<IN, ACC> windowState = getPartitionedState(
-					window, windowSerializer, windowStateDescriptor);
+				AppendingState<IN, ACC> windowState =
+						getPartitionedState(window, windowSerializer, windowStateDescriptor);
 				windowState.add(element.getValue());
 
 				context.key = key;
@@ -423,6 +424,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		windowState.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
+			mergingWindows.persist();
 		}
 		context.clear();
 	}
@@ -440,22 +442,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	/**
 	 * Retrieves the {@link MergingWindowSet} for the currently active key.
 	 * The caller must ensure that the correct key is set in the state backend.
+	 *
+	 * <p>The caller must also ensure to properly persist changes to state using
+	 * {@link MergingWindowSet#persist()}.
 	 */
-	@SuppressWarnings("unchecked")
 	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
-		MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getKeyedStateBackend().getCurrentKey());
-		if (mergingWindows == null) {
-			// try to retrieve from state
-
-			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
-			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
-			ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
+		ListState<Tuple2<W, W>> mergeState =
+				getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor);
 
-			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
-			mergeState.clear();
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		MergingWindowSet<W> mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState);
 
-			mergingWindowsByKey.put((K) getKeyedStateBackend().getCurrentKey(), mergingWindows);
-		}
 		return mergingWindows;
 	}
 
@@ -718,30 +715,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void snapshotState(StateSnapshotContext context) throws Exception {
-		super.snapshotState(context);
-		if (mergingWindowsByKey != null) {
-			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
-			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
-			for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
-				setCurrentKey(key.getKey());
-				ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
-				mergeState.clear();
-				key.getValue().persist(mergeState);
-			}
-		}
-	}
-
-	@Override
-	public void initializeState(StateInitializationContext context) throws Exception {
-		super.initializeState(context);
-	}
-
-	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca68d2e0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index e2cb6c8..7c1fa93 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.junit.Test;
+import org.mockito.Matchers;
 
 import java.util.Collection;
 
@@ -31,6 +35,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
 
 /**
  * Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations
@@ -40,7 +46,11 @@ public class MergingWindowSetTest {
 
 	@Test
 	public void testIncrementalMerging() throws Exception {
-		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet =
+				new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
 
 		TestingMergeFunction mergeFunction = new TestingMergeFunction();
 
@@ -140,7 +150,10 @@ public class MergingWindowSetTest {
 
 	@Test
 	public void testLateMerging() throws Exception {
-		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
 
 		TestingMergeFunction mergeFunction = new TestingMergeFunction();
 
@@ -210,7 +223,10 @@ public class MergingWindowSetTest {
 	 */
 	@Test
 	public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
-		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
 
 		TestingMergeFunction mergeFunction = new TestingMergeFunction();
 
@@ -234,7 +250,10 @@ public class MergingWindowSetTest {
 	 */
 	@Test
 	public void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
-		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
 
 		TestingMergeFunction mergeFunction = new TestingMergeFunction();
 
@@ -265,9 +284,65 @@ public class MergingWindowSetTest {
 				containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)),
 				containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))));
 		assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+	}
 
+	@Test
+	public void testRestoreFromState() throws Exception {
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+		when(mockState.get()).thenReturn(Lists.newArrayList(
+				new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
+				new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
+		));
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+		assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
+		assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
 	}
 
+	@Test
+	public void testPersist() throws Exception {
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		windowSet.addWindow(new TimeWindow(1, 2), mergeFunction);
+		windowSet.addWindow(new TimeWindow(17, 42), mergeFunction);
+
+		assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+		assertEquals(new TimeWindow(17, 42), windowSet.getStateWindow(new TimeWindow(17, 42)));
+
+		windowSet.persist();
+
+		verify(mockState).add(eq(new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(1, 2))));
+		verify(mockState).add(eq(new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(17, 42))));
+
+		verify(mockState, times(2)).add(Matchers.<Tuple2<TimeWindow, TimeWindow>>anyObject());
+	}
+
+	@Test
+	public void testPersistOnlyIfHaveUpdates() throws Exception {
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+		when(mockState.get()).thenReturn(Lists.newArrayList(
+				new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)),
+				new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4))
+		));
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+		assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42)));
+		assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2)));
+
+		windowSet.persist();
+
+		verify(mockState, times(0)).add(Matchers.<Tuple2<TimeWindow, TimeWindow>>anyObject());
+
+	}
 
 	private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
 		private TimeWindow target = null;