You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/20 13:58:17 UTC

flink git commit: [FLINK-9902][tests] Improve and refactor window checkpointing IT cases

Repository: flink
Updated Branches:
  refs/heads/master 01cf808ee -> d309e61e2


[FLINK-9902][tests] Improve and refactor window checkpointing IT cases

This closes #6376.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d309e61e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d309e61e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d309e61e

Branch: refs/heads/master
Commit: d309e61e2bae170872b43cf60bd2fd9fef77814c
Parents: 01cf808
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jul 18 23:29:56 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 20 15:57:49 2018 +0200

----------------------------------------------------------------------
 .../EventTimeAllWindowCheckpointingITCase.java  | 296 ++-------------
 .../EventTimeWindowCheckpointingITCase.java     | 375 +++++--------------
 .../WindowCheckpointingITCase.java              | 229 +++--------
 .../test/checkpointing/utils/FailingSource.java | 155 ++++++++
 .../flink/test/checkpointing/utils/IntType.java |  38 ++
 .../checkpointing/utils/ValidatingSink.java     | 128 +++++++
 .../flink/test/util/SuccessException.java       |   2 +-
 7 files changed, 505 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 9e14b26..5dc2aa0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -26,31 +26,23 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -88,20 +80,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = 3000;
 		final int windowSize = 100;
 		final int numKeys = 1;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(numKeys,
-							numElementsPerKey,
-							numElementsPerKey / 3))
+					.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
@@ -133,9 +122,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+					new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -149,7 +141,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		final int windowSize = 1000;
 		final int windowSlide = 100;
 		final int numKeys = 1;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -160,7 +151,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+				.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
 					.rebalance()
 					.timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
 					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
@@ -192,9 +183,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+					new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide)))
+				.setParallelism(1);
 
-			tryExecute(env, "Sliding Window Test");
+			env.execute("Sliding Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -207,20 +201,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = 3000;
 		final int windowSize = 100;
 		final int numKeys = 1;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(numKeys,
-							numElementsPerKey,
-							numElementsPerKey / 3))
+				.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.reduce(
@@ -261,9 +252,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+					new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -276,20 +270,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = 3000;
 		final int windowSize = 100;
 		final int numKeys = 1;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(numKeys,
-							numElementsPerKey,
-							numElementsPerKey / 3))
+				.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
@@ -329,9 +320,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 									}
 								}
 							})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+					new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -345,20 +339,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		final int windowSize = 1000;
 		final int windowSlide = 100;
 		final int numKeys = 1;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(numKeys,
-							numElementsPerKey,
-							numElementsPerKey / 3))
+				.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
 					.rebalance()
 					.timeWindowAll(Time.of(windowSize, MILLISECONDS),
 							Time.of(windowSlide, MILLISECONDS))
@@ -400,229 +391,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+					new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide)))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener {
-		private static volatile boolean failedBefore = false;
-
-		private final int numKeys;
-		private final int numElementsToEmit;
-		private final int failureAfterNumElements;
-
-		private volatile int numElementsEmitted;
-		private volatile int numSuccessfulCheckpoints;
-		private volatile boolean running = true;
-
-		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
-			this.numKeys = numKeys;
-			this.numElementsToEmit = numElementsToEmitPerKey;
-			this.failureAfterNumElements = failureAfterNumElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			// non-parallel source
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
-			// we loop longer than we have elements, to permit delayed checkpoints
-			// to still cause a failure
-			while (running) {
-
-				if (!failedBefore) {
-					// delay a bit, if we have not failed before
-					Thread.sleep(1);
-					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
-						// cause a failure if we have not failed before and have reached
-						// enough completed checkpoints and elements
-						failedBefore = true;
-						throw new Exception("Artificial Failure");
-					}
-				}
-
-				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
-					// the function failed before, or we are in the elements before the failure
-					synchronized (ctx.getCheckpointLock()) {
-						int next = numElementsEmitted++;
-						for (long i = 0; i < numKeys; i++) {
-							ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next);
-						}
-						ctx.emitWatermark(new Watermark(next));
-					}
-				}
-				else {
-					// if our work is done, delay a bit to prevent busy waiting
-					Thread.sleep(1);
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			numSuccessfulCheckpoints++;
-		}
-
-		public static void reset() {
-			failedBefore = false;
-		}
-
-		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.numElementsEmitted);
-		}
-
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.numElementsEmitted = state.get(0);
-		}
-	}
-
-	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
-		private final int numKeys;
-		private final int numWindowsExpected;
-
-		private ValidatingSink(int numKeys, int numWindowsExpected) {
-			this.numKeys = numKeys;
-			this.numWindowsExpected = numWindowsExpected;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
-			// it can happen that a checkpoint happens when the complete success state is
-			// already set. In that case we restart with the final state and would never
-			// finish because no more elements arrive.
-			if (windowCounts.size() == numKeys) {
-				boolean seenAll = true;
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount != numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-				if (seenAll) {
-					throw new SuccessException();
-				}
-			}
-		}
-
-		@Override
-		public void close() throws Exception {
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount != numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-			}
-			assertTrue("The source must see all expected windows.", seenAll);
-		}
-
-		@Override
-		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
-			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
-			// the sum should be "sum (start .. end-1)"
-
-			int expectedSum = 0;
-			for (long i = value.f1; i < value.f2; i++) {
-				// only sum up positive vals, to filter out the negative start of the
-				// first sliding windows
-				if (i > 0) {
-					expectedSum += i;
-				}
-			}
-
-			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
-			Integer curr = windowCounts.get(value.f0);
-			if (curr != null) {
-				windowCounts.put(value.f0, curr + 1);
-			}
-			else {
-				windowCounts.put(value.f0, 1);
-			}
-
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					} else if (windowCount > numWindowsExpected) {
-						fail("Window count to high: " + windowCount);
-					}
-				}
-
-				if (seenAll) {
-					// exit
-					throw new SuccessException();
-				}
-
-			}
-		}
-
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.windowCounts);
-		}
-
-		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.windowCounts.putAll(state.get(0));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Custom boxed integer type.
-	 */
-	public static class IntType {
-
-		public int value;
-
-		public IntType() {}
-
-		public IntType(int value) {
-			this.value = value;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index c3d93d7..e9a2e45 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -32,21 +32,20 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -65,19 +64,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
 import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -255,20 +245,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = numElementsPerKey();
 		final int windowSize = windowSize();
 		final int numKeys = numKeys();
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -299,12 +288,17 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 								sum += value.f1.value;
 								key = value.f0;
 							}
-							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+
+							final Tuple4<Long, Long, Long, IntType> result =
+								new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+							out.collect(result);
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new SinkValidatorUpdateFun(numElementsPerKey),
+					new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -326,7 +320,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = numElementsPerKey();
 		final int windowSize = windowSize();
 		final int numKeys = numKeys();
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -334,13 +327,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			env.setMaxParallelism(maxParallelism);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -378,9 +371,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 							out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
 						}
 					})
-					.addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new CountingSinkValidatorUpdateFun(),
+					new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -394,7 +389,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		final int windowSize = windowSize();
 		final int windowSlide = windowSlide();
 		final int numKeys = numKeys();
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -402,13 +396,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -439,12 +433,16 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 								sum += value.f1.value;
 								key = value.f0;
 							}
-							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+							final Tuple4<Long, Long, Long, IntType> output =
+								new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+							out.collect(output);
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new SinkValidatorUpdateFun(numElementsPerKey),
+					new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -457,20 +455,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		final int numElementsPerKey = numElementsPerKey();
 		final int windowSize = windowSize();
 		final int numKeys = numKeys();
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -505,16 +502,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 							assertTrue(open);
 
 							for (Tuple2<Long, IntType> in: input) {
-								out.collect(new Tuple4<>(in.f0,
-										window.getStart(),
-										window.getEnd(),
-										in.f1));
+								final Tuple4<Long, Long, Long, IntType> output = new Tuple4<>(in.f0,
+									window.getStart(),
+									window.getEnd(),
+									in.f1);
+								out.collect(output);
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+				.addSink(new ValidatingSink<>(
+					new SinkValidatorUpdateFun(numElementsPerKey),
+					new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -528,20 +528,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		final int windowSize = windowSize();
 		final int windowSlide = windowSlide();
 		final int numKeys = numKeys();
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -585,9 +584,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+					.addSink(new ValidatingSink<>(
+						new SinkValidatorUpdateFun(numElementsPerKey),
+						new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			env.execute("Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -599,151 +600,42 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener {
-		private static volatile boolean failedBefore = false;
-
-		private final int numKeys;
-		private final int numElementsToEmit;
-		private final int failureAfterNumElements;
-
-		private volatile int numElementsEmitted;
-		private volatile int numSuccessfulCheckpoints;
-		private volatile boolean running = true;
-
-		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
-			this.numKeys = numKeys;
-			this.numElementsToEmit = numElementsToEmitPerKey;
-			this.failureAfterNumElements = failureAfterNumElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			// non-parallel source
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
-			// we loop longer than we have elements, to permit delayed checkpoints
-			// to still cause a failure
-			while (running) {
-
-				if (!failedBefore) {
-					// delay a bit, if we have not failed before
-					Thread.sleep(1);
-					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
-						// cause a failure if we have not failed before and have reached
-						// enough completed checkpoints and elements
-						failedBefore = true;
-						throw new Exception("Artificial Failure");
-					}
-				}
-
-				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
-					// the function failed before, or we are in the elements before the failure
-					synchronized (ctx.getCheckpointLock()) {
-						int next = numElementsEmitted++;
-						for (long i = 0; i < numKeys; i++) {
-							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
-						}
-						ctx.emitWatermark(new Watermark(next));
-					}
-				}
-				else {
-
-					// if our work is done, delay a bit to prevent busy waiting
-					Thread.sleep(1);
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			numSuccessfulCheckpoints++;
-		}
+	/**
+	 * For validating the stateful window counts.
+	 */
+	static class CountingSinkValidatorUpdateFun
+		implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
 
 		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.numElementsEmitted);
-		}
+		public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
 
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.numElementsEmitted = state.get(0);
-		}
+			windowCounts.merge(value.f0, 1, (a, b) -> a + b);
 
-		public static void reset() {
-			failedBefore = false;
+			// verify the contents of that window, the contents should be:
+			// (key + num windows so far)
+			assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
 		}
 	}
 
-	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
-		private final int numKeys;
-		private final int numWindowsExpected;
+	//------------------------------------
 
-		private ValidatingSink(int numKeys, int numWindowsExpected) {
-			this.numKeys = numKeys;
-			this.numWindowsExpected = numWindowsExpected;
-		}
+	static class SinkValidatorUpdateFun implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
 
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		private final int elementsPerKey;
 
-			// it can happen that a checkpoint happens when the complete success state is
-			// already set. In that case we restart with the final state and would never
-			// finish because no more elements arrive.
-			if (windowCounts.size() == numKeys) {
-				boolean seenAll = true;
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount != numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-				if (seenAll) {
-					throw new SuccessException();
-				}
-			}
+		SinkValidatorUpdateFun(int elementsPerKey) {
+			this.elementsPerKey = elementsPerKey;
 		}
 
 		@Override
-		public void close() throws Exception {
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-			}
-			assertTrue("The sink must see all expected windows.", seenAll);
-		}
-
-		@Override
-		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
+		public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
 			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
 			// the sum should be "sum (start .. end-1)"
 
 			int expectedSum = 0;
-			for (long i = value.f1; i < value.f2; i++) {
+			// we shorten the range if it goes beyond elementsPerKey, because those are "incomplete" sliding windows
+			long countUntil = Math.min(value.f2, elementsPerKey);
+			for (long i = value.f1; i < countUntil; i++) {
 				// only sum up positive vals, to filter out the negative start of the
 				// first sliding windows
 				if (i > 0) {
@@ -753,142 +645,55 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 
 			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
 
-			Integer curr = windowCounts.get(value.f0);
-			if (curr != null) {
-				windowCounts.put(value.f0, curr + 1);
-			}
-			else {
-				windowCounts.put(value.f0, 1);
-			}
-
-			if (windowCounts.size() == numKeys) {
-				boolean seenAll = true;
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					} else if (windowCount > numWindowsExpected) {
-						fail("Window count to high: " + windowCount);
-					}
-				}
-
-				if (seenAll) {
-					// exit
-					throw new SuccessException();
-				}
-
-			}
-		}
-
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.windowCounts);
-		}
-
-		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			windowCounts.putAll(state.get(0));
+			windowCounts.merge(value.f0, 1, (val, increment) -> val + increment);
 		}
 	}
 
-	// Sink for validating the stateful window counts
-	private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+	static class SinkValidatorCheckFun implements ValidatingSink.ResultChecker {
 
 		private final int numKeys;
 		private final int numWindowsExpected;
 
-		private CountValidatingSink(int numKeys, int numWindowsExpected) {
+		SinkValidatorCheckFun(int numKeys, int elementsPerKey, int elementsPerWindow) {
 			this.numKeys = numKeys;
-			this.numWindowsExpected = numWindowsExpected;
+			this.numWindowsExpected = elementsPerKey / elementsPerWindow;
 		}
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void close() throws Exception {
-			boolean seenAll = true;
+		public boolean checkResult(Map<Long, Integer> windowCounts) {
 			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
+				for (Integer windowCount : windowCounts.values()) {
 					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
+						return false;
 					}
 				}
+				return true;
 			}
-			assertTrue("The source must see all expected windows.", seenAll);
+			return false;
 		}
+	}
 
-		@Override
-		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
-			Integer curr = windowCounts.get(value.f0);
-			if (curr != null) {
-				windowCounts.put(value.f0, curr + 1);
-			}
-			else {
-				windowCounts.put(value.f0, 1);
-			}
-
-			// verify the contents of that window, the contents should be:
-			// (key + num windows so far)
-
-			assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					} else if (windowCount > numWindowsExpected) {
-						fail("Window count to high: " + windowCount);
-					}
-				}
+	static class KeyedEventTimeGenerator implements FailingSource.EventEmittingGenerator {
 
-				if (seenAll) {
-					// exit
-					throw new SuccessException();
-				}
+		private final int keyUniverseSize;
+		private final int watermarkTrailing;
 
-			}
-		}
-
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.windowCounts);
+		public KeyedEventTimeGenerator(int keyUniverseSize, int numElementsPerWindow) {
+			this.keyUniverseSize = keyUniverseSize;
+			// we let the watermark a bit behind, so that there can be in-flight timers that required checkpointing
+			// to include correct timer snapshots in our testing.
+			this.watermarkTrailing = 4 * numElementsPerWindow / 3;
 		}
 
 		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
+			final IntType intTypeNext = new IntType(eventSequenceNo);
+			for (long i = 0; i < keyUniverseSize; i++) {
+				final Tuple2<Long, IntType> generatedEvent = new Tuple2<>(i, intTypeNext);
+				ctx.collectWithTimestamp(generatedEvent, eventSequenceNo);
 			}
-			this.windowCounts.putAll(state.get(0));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static class IntType {
-
-		public int value;
-
-		public IntType() {}
 
-		public IntType(int value) {
-			this.value = value;
+			ctx.emitWatermark(new Watermark(eventSequenceNo - watermarkTrailing));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index b6163e8..b0e2967 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,18 +25,17 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -47,9 +46,7 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -92,7 +89,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 	@Test
 	public void testTumblingProcessingTimeWindow() {
 		final int numElements = 3000;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -100,11 +96,14 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
 
+			SinkValidatorUpdaterAndChecker updaterAndChecker =
+				new SinkValidatorUpdaterAndChecker(numElements, 1);
+
 			env
-					.addSource(new FailingSource(numElements, numElements / 3))
+					.addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(100, MILLISECONDS))
@@ -130,11 +129,12 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 							for (Tuple2<Long, IntType> value : values) {
 								assertEquals(value.f0.intValue(), value.f1.value);
-								out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1)));
+								out.collect(new Tuple2<>(value.f0, new IntType(1)));
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
+				.addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+				.setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -147,7 +147,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 	@Test
 	public void testSlidingProcessingTimeWindow() {
 		final int numElements = 3000;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -155,11 +154,12 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
-
+			SinkValidatorUpdaterAndChecker updaterAndChecker =
+				new SinkValidatorUpdaterAndChecker(numElements, 3);
 			env
-					.addSource(new FailingSource(numElements, numElements / 3))
+					.addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
@@ -185,13 +185,14 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 							for (Tuple2<Long, IntType> value : values) {
 								assertEquals(value.f0.intValue(), value.f1.value);
-								out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1)));
+								out.collect(new Tuple2<>(value.f0, new IntType(1)));
 							}
 						}
 					})
-					.addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
+				.addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			tryExecute(env, "Sliding Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -202,7 +203,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 	@Test
 	public void testAggregatingTumblingProcessingTimeWindow() {
 		final int numElements = 3000;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -210,11 +210,12 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
-
+			SinkValidatorUpdaterAndChecker updaterAndChecker =
+				new SinkValidatorUpdaterAndChecker(numElements, 1);
 			env
-					.addSource(new FailingSource(numElements, numElements / 3))
+					.addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
 					.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
 						@Override
 						public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
@@ -234,9 +235,10 @@ public class WindowCheckpointingITCase extends TestLogger {
 							return new Tuple2<>(a.f0, new IntType(1));
 						}
 					})
-					.addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
+				.addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			tryExecute(env, "Aggregating Tumbling Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -247,7 +249,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 	@Test
 	public void testAggregatingSlidingProcessingTimeWindow() {
 		final int numElements = 3000;
-		FailingSource.reset();
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -255,11 +256,12 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 			env.getConfig().disableSysoutLogging();
-
+			SinkValidatorUpdaterAndChecker updaterAndChecker =
+				new SinkValidatorUpdaterAndChecker(numElements, 3);
 			env
-					.addSource(new FailingSource(numElements, numElements / 3))
+					.addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
 					.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
 						@Override
 						public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
@@ -278,9 +280,10 @@ public class WindowCheckpointingITCase extends TestLogger {
 							return new Tuple2<>(a.f0, new IntType(1));
 						}
 					})
-					.addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
+				.addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+				.setParallelism(1);
 
-			tryExecute(env, "Tumbling Window Test");
+			tryExecute(env, "Aggregating Sliding Window Test");
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -292,152 +295,50 @@ public class WindowCheckpointingITCase extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener {
-		private static volatile boolean failedBefore = false;
-
-		private final int numElementsToEmit;
-		private final int failureAfterNumElements;
-
-		private volatile int numElementsEmitted;
-		private volatile int numSuccessfulCheckpoints;
-		private volatile boolean running = true;
-
-		private FailingSource(int numElementsToEmit, int failureAfterNumElements) {
-			this.numElementsToEmit = numElementsToEmit;
-			this.failureAfterNumElements = failureAfterNumElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			// non-parallel source
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
-			// we loop longer than we have elements, to permit delayed checkpoints
-			// to still cause a failure
-			while (running) {
-				if (!failedBefore) {
-					// delay a bit, if we have not failed before
-					Thread.sleep(1);
-					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
-						// cause a failure if we have not failed before and have reached
-						// enough completed checkpoints and elements
-						failedBefore = true;
-						throw new Exception("Artificial Failure");
-					}
-				}
-
-				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
-					// the function failed before, or we are in the elements before the failure
-					synchronized (ctx.getCheckpointLock()) {
-						int next = numElementsEmitted++;
-						ctx.collect(new Tuple2<Long, IntType>((long) next, new IntType(next)));
-					}
-				} else {
-					// if our work is done, delay a bit to prevent busy waiting
-					Thread.sleep(10);
-				}
-			}
-		}
+	static class Generator implements FailingSource.EventEmittingGenerator {
 
 		@Override
-		public void cancel() {
-			running = false;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			numSuccessfulCheckpoints++;
-		}
-
-		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.numElementsEmitted);
-		}
-
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.numElementsEmitted = state.get(0);
-		}
-
-		public static void reset() {
-			failedBefore = false;
+		public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
+			ctx.collect(new Tuple2<>((long) eventSequenceNo, new IntType(eventSequenceNo)));
 		}
 	}
 
-	private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> counts = new HashMap<>();
+	static class SinkValidatorUpdaterAndChecker
+		implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>, ValidatingSink.ResultChecker {
 
 		private final int elementCountExpected;
 		private final int countPerElementExpected;
 
-		private int aggCount;
-
-		private ValidatingSink(int elementCountExpected, int countPerElementExpected) {
+		SinkValidatorUpdaterAndChecker(int elementCountExpected, int countPerElementExpected) {
 			this.elementCountExpected = elementCountExpected;
 			this.countPerElementExpected = countPerElementExpected;
 		}
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-			checkSuccess();
+		public void updateCount(Tuple2<Long, IntType> value, Map<Long, Integer> windowCounts) {
+			windowCounts.merge(value.f0, value.f1.value, (a, b) -> a + b);
 		}
 
 		@Override
-		public void invoke(Tuple2<Long, IntType> value) throws Exception {
-			Integer curr = counts.get(value.f0);
-			if (curr != null) {
-				counts.put(value.f0, curr + value.f1.value);
-			}
-			else {
-				counts.put(value.f0, value.f1.value);
-			}
+		public boolean checkResult(Map<Long, Integer> windowCounts) {
+			int aggCount = 0;
 
-			// check if we have seen all we expect
-			aggCount += value.f1.value;
-			checkSuccess();
-		}
-
-		private void checkSuccess() throws SuccessException {
-			if (aggCount >= elementCountExpected * countPerElementExpected) {
-				// we are done. validate
-				assertEquals(elementCountExpected, counts.size());
-
-				for (Integer i : counts.values()) {
-					assertEquals(countPerElementExpected, i.intValue());
-				}
-
-				// exit
-				throw new SuccessException();
+			for (Integer i : windowCounts.values()) {
+				aggCount += i;
 			}
-		}
 
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.counts);
-		}
-
-		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			if (aggCount < elementCountExpected * countPerElementExpected
+				|| elementCountExpected != windowCounts.size()) {
+				return false;
 			}
-			this.counts.putAll(state.get(0));
 
-			for (Integer i : state.get(0).values()) {
-				this.aggCount += i;
+			for (int i : windowCounts.values()) {
+				if (countPerElementExpected != i) {
+					return false;
+				}
 			}
+
+			return true;
 		}
 	}
 
@@ -452,22 +353,4 @@ public class WindowCheckpointingITCase extends TestLogger {
 				new TimeCharacteristic[]{TimeCharacteristic.IngestionTime}
 		);
 	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * POJO with int value.
-	 */
-	public static class IntType {
-
-		public int value;
-
-		public IntType() {}
-
-		public IntType(int value) {
-			this.value = value;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
new file mode 100644
index 0000000..822d73b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial failures.
+ */
+public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+	implements ListCheckpointed<Integer>, CheckpointListener {
+
+	/**
+	 * Function to generate and emit the test events (and watermarks if required).
+	 */
+	@FunctionalInterface
+	public interface EventEmittingGenerator extends Serializable {
+		void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);
+	}
+
+	private static final long INITIAL = Long.MIN_VALUE;
+	private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE;
+
+	@Nonnull
+	private final EventEmittingGenerator eventEmittingGenerator;
+	private final int expectedEmitCalls;
+	private final int failureAfterNumElements;
+	private final boolean usingProcessingTime;
+	private final AtomicLong checkpointStatus;
+
+	private int emitCallCount;
+	private volatile boolean running;
+
+	public FailingSource(
+		@Nonnull EventEmittingGenerator eventEmittingGenerator,
+		@Nonnegative int numberOfGeneratorInvocations) {
+		this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime);
+	}
+
+	public FailingSource(
+		@Nonnull EventEmittingGenerator eventEmittingGenerator,
+		@Nonnegative int numberOfGeneratorInvocations,
+		@Nonnull TimeCharacteristic timeCharacteristic) {
+		this.eventEmittingGenerator = eventEmittingGenerator;
+		this.running = true;
+		this.emitCallCount = 0;
+		this.expectedEmitCalls = numberOfGeneratorInvocations;
+		this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+		this.checkpointStatus = new AtomicLong(INITIAL);
+		this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime;
+	}
+
+	@Override
+	public void open(Configuration parameters) {
+		// non-parallel source
+		assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+	}
+
+	@Override
+	public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+
+		final RuntimeContext runtimeContext = getRuntimeContext();
+		// detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt)
+		final boolean failThisTask =
+			runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0;
+
+		// we loop longer than we have elements, to permit delayed checkpoints
+		// to still cause a failure
+		while (running) {
+
+			// the function failed before, or we are in the elements before the failure
+			synchronized (ctx.getCheckpointLock()) {
+				eventEmittingGenerator.emitEvent(ctx, emitCallCount++);
+				running &= (emitCallCount < expectedEmitCalls);
+			}
+
+			if (emitCallCount < failureAfterNumElements) {
+				Thread.sleep(1);
+			} else if (failThisTask && emitCallCount == failureAfterNumElements) {
+				// wait for a pending checkpoint that fulfills our requirements if needed
+				while (checkpointStatus.get() != STATEFUL_CHECKPOINT_COMPLETED) {
+					Thread.sleep(1);
+				}
+				throw new Exception("Artificial Failure");
+			}
+		}
+
+		if (usingProcessingTime) {
+			while (running) {
+				Thread.sleep(10);
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		// This will unblock the task for failing, if this is the checkpoint we are waiting for
+		checkpointStatus.compareAndSet(checkpointId, STATEFUL_CHECKPOINT_COMPLETED);
+	}
+
+	@Override
+	public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+		// We accept a checkpoint as basis if it should have a "decent amount" of state
+		if (emitCallCount > failureAfterNumElements / 2) {
+			// This means we are waiting for notification of this checkpoint to completed now.
+			checkpointStatus.compareAndSet(INITIAL, checkpointId);
+		}
+		return Collections.singletonList(this.emitCallCount);
+	}
+
+	@Override
+	public void restoreState(List<Integer> state) throws Exception {
+		if (state.isEmpty() || state.size() > 1) {
+			throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		}
+		this.emitCallCount = state.get(0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
new file mode 100644
index 0000000..3bc4ea0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+/**
+ * Test type that wraps an int.
+ */
+public class IntType {
+
+	public int value;
+
+	public IntType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public String toString() {
+		return "IntType{" +
+			"value=" + value +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
new file mode 100644
index 0000000..b352738
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Generalized sink for validation of window checkpointing IT cases.
+ */
+public class ValidatingSink<T> extends RichSinkFunction<T>
+	implements ListCheckpointed<HashMap<Long, Integer>> {
+
+	/**
+	 * Function to check if the window counts are as expected.
+	 */
+	@FunctionalInterface
+	public interface ResultChecker extends Serializable {
+		boolean checkResult(Map<Long, Integer> windowCounts);
+	}
+
+	/**
+	 * Function that updates the window counts from an update event.
+	 *
+	 * @param <T> type of the update event.
+	 */
+	public interface CountUpdater<T> extends Serializable {
+		void updateCount(T update, Map<Long, Integer> windowCounts);
+	}
+
+	@Nonnull
+	private final ResultChecker resultChecker;
+
+	@Nonnull
+	private final CountUpdater<T> countUpdater;
+
+	@Nonnull
+	private final HashMap<Long, Integer> windowCounts;
+
+	private final boolean usingProcessingTime;
+
+	public ValidatingSink(
+		@Nonnull CountUpdater<T> countUpdater,
+		@Nonnull ResultChecker resultChecker) {
+		this(countUpdater, resultChecker, TimeCharacteristic.EventTime);
+	}
+
+	public ValidatingSink(
+		@Nonnull CountUpdater<T> countUpdater,
+		@Nonnull ResultChecker resultChecker,
+		@Nonnull TimeCharacteristic timeCharacteristic) {
+
+		this.resultChecker = resultChecker;
+		this.countUpdater = countUpdater;
+		this.usingProcessingTime = TimeCharacteristic.ProcessingTime == timeCharacteristic;
+		this.windowCounts = new HashMap<>();
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		// this sink can only work with DOP 1
+		assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		if (usingProcessingTime && resultChecker.checkResult(windowCounts)) {
+			throw new SuccessException();
+		}
+	}
+
+	@Override
+	public void close() {
+		if (resultChecker.checkResult(windowCounts)) {
+			if (usingProcessingTime) {
+				throw new SuccessException();
+			}
+		} else {
+			throw new AssertionError("Test failed check.");
+		}
+	}
+
+	@Override
+	public void invoke(T value, Context context) throws Exception {
+		countUpdater.updateCount(value, windowCounts);
+		if (usingProcessingTime && resultChecker.checkResult(windowCounts)) {
+			throw new SuccessException();
+		}
+	}
+
+	@Override
+	public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+		return Collections.singletonList(this.windowCounts);
+	}
+
+	@Override
+	public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+		if (state.isEmpty() || state.size() > 1) {
+			throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		}
+		windowCounts.putAll(state.get(0));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
index 22ac02b..d8e2a8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
@@ -21,6 +21,6 @@ package org.apache.flink.test.util;
 /**
  * Exception that is thrown to terminate a program and indicate success.
  */
-public class SuccessException extends Exception {
+public class SuccessException extends RuntimeException {
 	private static final long serialVersionUID = -7011865671593955887L;
 }