You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/25 10:20:11 UTC

flink git commit: [FLINK-3740] Make Session Window State Checkpointed

Repository: flink
Updated Branches:
  refs/heads/master e48a5f19e -> f2f5bd5be


[FLINK-3740] Make Session Window State Checkpointed


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2f5bd5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2f5bd5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2f5bd5b

Branch: refs/heads/master
Commit: f2f5bd5bed2f737b8418be09359691e899462184
Parents: e48a5f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 12 16:27:18 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Apr 25 10:18:40 2016 +0200

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       |  17 +--
 .../operators/windowing/MergingWindowSet.java   |  20 +++
 .../operators/windowing/WindowOperator.java     |  86 ++++++++++--
 .../operators/windowing/WindowOperatorTest.java | 131 +++++++++++++------
 .../util/OneInputStreamOperatorTestHarness.java |  37 +++++-
 5 files changed, 229 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 1e4e453..84ee0b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -82,15 +82,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
 				element.getTimestamp());
 
-		K key = (K) getStateBackend().getCurrentKey();
+		final K key = (K) getStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
-			MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
-			if (mergingWindows == null) {
-				mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
-				mergingWindowsByKey.put(key, mergingWindows);
-			}
 
+			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
 			for (W window : elementWindows) {
 				// If there is a merge, it can only result in a window that contains our new
@@ -107,6 +103,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 							public void merge(W mergeResult,
 									Collection<W> mergedWindows, W stateWindowResult,
 									Collection<W> mergedStateWindows) throws Exception {
+								context.key = key;
 								context.window = mergeResult;
 
 								// store for later use
@@ -141,7 +138,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
 						mergeTriggerResult.f0);
 
-				processTriggerResult(combinedTriggerResult, key, actualWindow);
+				processTriggerResult(combinedTriggerResult, actualWindow);
 			}
 
 		} else {
@@ -157,14 +154,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				context.window = window;
 				TriggerResult triggerResult = context.onElement(element);
 
-				processTriggerResult(triggerResult, key, window);
+				processTriggerResult(triggerResult, window);
 			}
 		}
 	}
 
 	@Override
 	@SuppressWarnings("unchecked,rawtypes")
-	protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
+	protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;
@@ -175,7 +172,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		MergingWindowSet<W> mergingWindows = null;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
-			mergingWindows = mergingWindowsByKey.get(key);
+			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(window);
 			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 7ef1af4..49a2017 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.slf4j.Logger;
@@ -73,6 +75,24 @@ public class MergingWindowSet<W extends Window> {
 	}
 
 	/**
+	 * Restores a {@link MergingWindowSet} from the given state.
+	 */
+	public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner, ListState<Tuple2<W, W>> state) throws Exception {
+		this.windowAssigner = windowAssigner;
+		windows = new HashMap<>();
+
+		for (Tuple2<W, W> window: state.get()) {
+			windows.put(window.f0, window.f1);
+		}
+	}
+
+	public void persist(ListState<Tuple2<W, W>> state) throws Exception {
+		for (Map.Entry<W, W> window: windows.entrySet()) {
+			state.add(new Tuple2<>(window.getKey(), window.getValue()));
+		}
+	}
+
+	/**
 	 * Returns the state window for the given in-flight {@code Window}. The state window is the
 	 * {@code Window} in which we keep the actual state of a given in-flight window. Windows
 	 * might expand but we keep to original state window for keeping the elements of the window

http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c106e70..919cee7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -29,10 +31,13 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
@@ -224,6 +229,25 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	@Override
 	public final void close() throws Exception {
 		super.close();
+		timestampedCollector = null;
+		watermarkTimers = null;
+		watermarkTimersQueue = null;
+		processingTimeTimers = null;
+		processingTimeTimersQueue = null;
+		context = null;
+		mergingWindowsByKey = null;
+	}
+
+	@Override
+	public void dispose() {
+		super.dispose();
+		timestampedCollector = null;
+		watermarkTimers = null;
+		watermarkTimersQueue = null;
+		processingTimeTimers = null;
+		processingTimeTimersQueue = null;
+		context = null;
+		mergingWindowsByKey = null;
 	}
 
 	@Override
@@ -231,15 +255,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
-		K key = (K) getStateBackend().getCurrentKey();
+		final K key = (K) getStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
-			MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
-			if (mergingWindows == null) {
-				mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
-				mergingWindowsByKey.put(key, mergingWindows);
-			}
-
+			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
 			for (W window: elementWindows) {
 				// If there is a merge, it can only result in a window that contains our new
@@ -255,6 +274,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					public void merge(W mergeResult,
 							Collection<W> mergedWindows, W stateWindowResult,
 							Collection<W> mergedStateWindows) throws Exception {
+						context.key = key;
 						context.window = mergeResult;
 
 						// store for later use
@@ -286,7 +306,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
 
-				processTriggerResult(combinedTriggerResult, key, actualWindow);
+				processTriggerResult(combinedTriggerResult, actualWindow);
 			}
 
 		} else {
@@ -301,13 +321,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.window = window;
 				TriggerResult triggerResult = context.onElement(element);
 
-				processTriggerResult(triggerResult, key, window);
+				processTriggerResult(triggerResult, window);
 			}
 		}
 	}
 
+	/**
+	 * Retrieves the {@link MergingWindowSet} for the currently active key. The caller must
+	 * ensure that the correct key is set in the state backend.
+	 */
+	@SuppressWarnings("unchecked")
+	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+		MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+		if (mergingWindows == null) {
+			// try to retrieve from state
+
+			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+
+			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
+			mergeState.clear();
+
+			mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
+		}
+		return mergingWindows;
+	}
+
+
+	/**
+	 * Process {@link TriggerResult} for the currently active key and the given window. The caller
+	 * must ensure that the correct key is set in the state backend and the context object.
+	 */
 	@SuppressWarnings("unchecked")
-	protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
+	protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
 		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
 			// do nothing
 			return;
@@ -318,7 +365,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		MergingWindowSet<W> mergingWindows = null;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
-			mergingWindows = mergingWindowsByKey.get(key);
+			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(window);
 			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 
@@ -366,7 +413,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.window = timer.window;
 				setKeyContext(timer.key);
 				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				processTriggerResult(triggerResult, context.key, context.window);
+				processTriggerResult(triggerResult, context.window);
 			} else {
 				fire = false;
 			}
@@ -389,7 +436,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.window = timer.window;
 				setKeyContext(timer.key);
 				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-				processTriggerResult(triggerResult, context.key, context.window);
+				processTriggerResult(triggerResult, context.window);
 			} else {
 				fire = false;
 			}
@@ -604,7 +651,20 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// ------------------------------------------------------------------------
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+
+		if (mergingWindowsByKey != null) {
+			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+			for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
+				setKeyContext(key.getKey());
+				ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+				mergeState.clear();
+				key.getValue().persist(mergeState);
+			}
+		}
+
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
 		AbstractStateBackend.CheckpointStateOutputView out =

http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 642a16b..233131d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -36,22 +35,23 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
@@ -109,6 +109,13 @@ public class WindowOperatorTest {
 		expectedOutput.add(new Watermark(2999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processWatermark(new Watermark(initialTime + 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
 		expectedOutput.add(new Watermark(3999));
@@ -145,8 +152,8 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-			new SumReducer(),
-			inputType.createSerializer(new ExecutionConfig()));
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
@@ -164,6 +171,7 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
+		testHarness.setup();
 		testHarness.open();
 
 		testSlidingEventTimeWindows(testHarness);
@@ -182,21 +190,21 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-			inputType.createSerializer(new ExecutionConfig()));
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-			SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-			new TimeWindow.Serializer(),
-			new TupleKeySelector(),
-			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-			stateDesc,
-			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
-			EventTimeTrigger.create());
+				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
+				EventTimeTrigger.create());
 
 		operator.setInputType(inputType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
+				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -206,7 +214,8 @@ public class WindowOperatorTest {
 
 		testHarness.close();
 
-		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		// we close once in the rest...
+		Assert.assertEquals("Close was not called.", 2, closeCalled.get());
 	}
 
 	private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
@@ -237,6 +246,13 @@ public class WindowOperatorTest {
 		expectedOutput.add(new Watermark(1999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processWatermark(new Watermark(initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
@@ -266,7 +282,7 @@ public class WindowOperatorTest {
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 	}
 
-		@Test
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testTumblingEventTimeWindowsReduce() throws Exception {
 		closeCalled.set(0);
@@ -276,8 +292,8 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-			new SumReducer(),
-			inputType.createSerializer(new ExecutionConfig()));
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
@@ -312,21 +328,21 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-			inputType.createSerializer(new ExecutionConfig()));
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-			new TimeWindow.Serializer(),
-			new TupleKeySelector(),
-			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-			stateDesc,
-			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
-			EventTimeTrigger.create());
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
+				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator);
+				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -336,7 +352,8 @@ public class WindowOperatorTest {
 
 		testHarness.close();
 
-		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		// we close once in the rest...
+		Assert.assertEquals("Close was not called.", 2, closeCalled.get());
 	}
 
 	@Test
@@ -379,6 +396,14 @@ public class WindowOperatorTest {
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
@@ -390,6 +415,7 @@ public class WindowOperatorTest {
 
 		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
 		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+
 		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
 		expectedOutput.add(new Watermark(initialTime + 12000));
 
@@ -444,6 +470,13 @@ public class WindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
 
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
@@ -518,6 +551,14 @@ public class WindowOperatorTest {
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6000));
@@ -610,6 +651,14 @@ public class WindowOperatorTest {
 		// add elements out-of-order
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 1000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 2500));
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
@@ -637,8 +686,8 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-			new SumReducer(),
-			inputType.createSerializer(new ExecutionConfig()));
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
@@ -727,8 +776,8 @@ public class WindowOperatorTest {
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
 		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-			new SumReducer(),
-			inputType.createSerializer(new ExecutionConfig()));
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
@@ -764,6 +813,14 @@ public class WindowOperatorTest {
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.restore(snapshot, 10L);
+		testHarness.open();
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
 
 
@@ -793,7 +850,7 @@ public class WindowOperatorTest {
 		private static final long serialVersionUID = 1L;
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-			Tuple2<String, Integer> value2) throws Exception {
+				Tuple2<String, Integer> value2) throws Exception {
 			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
 		}
 	}
@@ -818,9 +875,9 @@ public class WindowOperatorTest {
 
 		@Override
 		public void apply(String key,
-			W window,
-			Iterable<Tuple2<String, Integer>> input,
-			Collector<Tuple2<String, Integer>> out) throws Exception {
+				W window,
+				Iterable<Tuple2<String, Integer>> input,
+				Collector<Tuple2<String, Integer>> out) throws Exception {
 
 			if (!openCalled) {
 				fail("Open was not called");

http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 46e74e7..c5f983a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -69,6 +70,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	final Object checkpointLock;
 
 	StreamTask<?, ?> mockTask;
+
+	/**
+	 * Whether setup() was called on the operator. This is reset when calling close().
+	 */
+	private boolean setupCalled = false;
 	
 	
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
@@ -151,20 +157,47 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
+	 * Calls
+	 * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
 	 */
-	public void open() throws Exception {
+	public void setup() throws Exception {
 		operator.setup(mockTask, config, new MockOutput());
+		setupCalled = true;
+	}
 
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
+	 * calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * if it was not called before.
+	 */
+	public void open() throws Exception {
+		if (!setupCalled) {
+			setup();
+		}
 		operator.open();
 	}
 
 	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long, long)} ()}
+	 */
+	public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
+		return operator.snapshotOperatorState(checkpointId, timestamp);
+	}
+
+	/**
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState, long)} ()}
+	 */
+	public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
+		operator.restoreState(snapshot, recoveryTimestamp);
+	}
+
+	/**
 	 * Calls close and dispose on the operator.
 	 */
 	public void close() throws Exception {
 		operator.close();
 		operator.dispose();
+		setupCalled = false;
 	}
 
 	public void processElement(StreamRecord<IN> element) throws Exception {