You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/11 15:53:35 UTC

[1/2] flink git commit: [FLINK-2423] [streaming] ITCase for checkpoint notifications

Repository: flink
Updated Branches:
  refs/heads/master f50ae26a2 -> 10ce2e2d0


[FLINK-2423] [streaming] ITCase for checkpoint notifications

Closes #980


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10ce2e2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10ce2e2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10ce2e2d

Branch: refs/heads/master
Commit: 10ce2e2d033ddafb326c3b9b81e19e669c19df96
Parents: 4ca7df5
Author: mbalassi <mb...@apache.org>
Authored: Sat Aug 8 22:02:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 11 14:12:00 2015 +0200

----------------------------------------------------------------------
 .../StreamCheckpointNotifierITCase.java         | 135 ++++++++++---------
 1 file changed, 70 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10ce2e2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 4c1ac4e..de8ee9d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -31,6 +32,8 @@ import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -41,7 +44,6 @@ import java.util.Random;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Integration test for the {@link CheckpointNotifier} interface. The test ensures that
@@ -50,6 +52,10 @@ import static org.junit.Assert.fail;
  * called for a deliberately failed checkpoint.
  *
  * <p>
+ * The topology tested here includes a number of {@link OneInputStreamOperator}s and a
+ * {@link TwoInputStreamOperator}.
+ *
+ * <p>
  * Note that as a result of doing the checks on the task level there is no way to verify
  * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every
  * successfully completed checkpoint.
@@ -57,40 +63,37 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase {
 
-	final long NUM_STRINGS = 10_000_000L;
+	final long NUM_LONGS = 10_000_000L;
 
 	/**
 	 * Runs the following program:
 	 *
 	 * <pre>
-	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 *     [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
 	@Override
 	public void testProgram(StreamExecutionEnvironment env) {
 
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+		DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(NUM_LONGS));
 
 		stream
 				// -------------- first vertex, chained to the src ----------------
-				.filter(new StringRichFilterFunction())
+				.filter(new LongRichFilterFunction())
 
-						// -------------- second vertex, applying the co-map ----------------
+				// -------------- second vertex, applying the co-map ----------------
 				.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
 
 				// -------------- third vertex - the stateful one that also fails ----------------
-				.map(new StringPrefixCountRichMapFunction())
-				.startNewChain()
 				.map(new IdentityMapFunction())
+				.startNewChain()
 
-						// -------------- fourth vertex - reducer and the sink ----------------
-				.groupBy("prefix")
-				.reduce(new OnceFailingReducer(NUM_STRINGS))
-				.addSink(new SinkFunction<PrefixCount>() {
+				// -------------- fourth vertex - reducer and the sink ----------------
+				.groupBy(0)
+				.reduce(new OnceFailingReducer(NUM_LONGS))
+				.addSink(new SinkFunction<Tuple1<Long>>() {
 					@Override
-					public void invoke(PrefixCount value) {
+					public void invoke(Tuple1<Long> value) {
 						// do nothing
 					}
 				});
@@ -98,18 +101,24 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 
 	@Override
 	public void postSubmit() {
-		List[][] checkList = new List[][]{	StringGeneratingSourceFunction.completedCheckpoints,
+		List[][] checkList = new List[][]{	GeneratingSourceFunction.completedCheckpoints,
 				IdentityMapFunction.completedCheckpoints,
-				StringPrefixCountRichMapFunction.completedCheckpoints,
+				LongRichFilterFunction.completedCheckpoints,
 				LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
 
+		long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
+
 		for(List[] parallelNotifications : checkList) {
 			for (int i = 0; i < PARALLELISM; i++){
 				List<Long> notifications = parallelNotifications[i];
 				assertTrue("No checkpoint notification was received.",
 						notifications.size() > 0);
 				assertFalse("Failure checkpoint was marked as completed.",
-						notifications.contains(OnceFailingReducer.failureCheckpointID));
+						notifications.contains(failureCheckpointID));
+				assertFalse("No checkpoint received before failure.",
+						notifications.get(0) == failureCheckpointID);
+				assertFalse("No checkpoint received after failure.",
+						notifications.get(notifications.size() - 1) == failureCheckpointID);
 				assertTrue("Checkpoint notification was received multiple times",
 						notifications.size() == new HashSet<Long>(notifications).size());
 			}
@@ -120,17 +129,20 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
 
-	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements  ParallelSourceFunction<String>, CheckpointNotifier {
+	/**
+	 * Generates some Long values and as an implementation for the {@link CheckpointNotifier}
+	 * interface it stores all the checkpoint ids it has seen in a static list.
+	 */
+	private static class GeneratingSourceFunction extends RichSourceFunction<Long>
+			implements  ParallelSourceFunction<Long>, CheckpointNotifier {
 
 		// operator life cycle
 		private volatile boolean isRunning;
 
 		// operator behaviour
 		private final long numElements;
-		private Random rnd;
+		private long result;
 
-		private StringBuilder stringBuilder;
 		private OperatorState<Integer> index;
 		private int step;
 
@@ -138,14 +150,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		private int subtaskId;
 		public static List[] completedCheckpoints = new List[PARALLELISM];
 
-		StringGeneratingSourceFunction(long numElements) {
+		GeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			rnd = new Random();
-			stringBuilder = new StringBuilder();
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
 			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
 			index = getRuntimeContext().getOperatorState("index", subtaskId, false);
@@ -159,16 +169,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		}
 
 		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
+		public void run(SourceContext<Long> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
 			while (isRunning && index.value() < numElements) {
-				char first = (char) ((index.value() % 40) + 40);
 
-				stringBuilder.setLength(0);
-				stringBuilder.append(first);
-
-				String result = randomString(stringBuilder, rnd);
+				result = index.value() % 10;
 
 				synchronized (lockingObject) {
 					index.update(index.value() + step);
@@ -182,32 +188,25 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 			isRunning = false;
 		}
 
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
 		@Override
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {
 			completedCheckpoints[subtaskId].add(checkpointId);
 		}
 	}
 
-	private static class IdentityMapFunction extends RichMapFunction<PrefixCount, PrefixCount>
+	/**
+	 * Identity transform on Long values wrapping the output in a tuple. As an implementation
+	 * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids it has seen in a static list.
+	 */
+	private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
 			implements CheckpointNotifier {
 
 		public static List[] completedCheckpoints = new List[PARALLELISM];
 		private int subtaskId;
 
 		@Override
-		public PrefixCount map(PrefixCount value) throws Exception {
-			return value;
+		public Tuple1<Long> map(Long value) throws Exception {
+			return Tuple1.of(value);
 		}
 
 		@Override
@@ -226,7 +225,10 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		}
 	}
 
-	private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> implements Checkpointed<Long>{
+	/**
+	 * Reducer that causes one failure between seeing 40% to 70% of the records.
+	 */
+	private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> implements Checkpointed<Long> {
 
 		private static volatile boolean hasFailed = false;
 		public static volatile long failureCheckpointID;
@@ -251,9 +253,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		}
 
 		@Override
-		public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
+		public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) throws Exception {
 			count++;
-			value1.count += value2.count;
+			value1.f0 += value2.f0;
 			return value1;
 		}
 
@@ -273,20 +275,23 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		}
 	}
 
-	private static class StringRichFilterFunction implements FilterFunction<String> {
-		@Override
-		public boolean filter(String value) {
-			return value.length() < 100;
-		}
-	}
-
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+	/**
+	 * Filter on Long values supposedly letting all values through. As an implementation
+	 * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids
+	 * it has seen in a static list.
+	 */
+	private static class LongRichFilterFunction extends RichFilterFunction<Long>
 			implements CheckpointNotifier {
 
 		public static List[] completedCheckpoints = new List[PARALLELISM];
 		private int subtaskId;
 
 		@Override
+		public boolean filter(Long value) {
+			return value < 100;
+		}
+
+		@Override
 		public void open(Configuration conf) throws IOException {
 			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
 
@@ -300,14 +305,14 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {
 			completedCheckpoints[subtaskId].add(checkpointId);
 		}
-
-		@Override
-		public PrefixCount map(String value) throws IOException {
-			return new PrefixCount(value.substring(0, 1), value, 1L);
-		}
 	}
 
-	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String>
+	/**
+	 * CoFlatMap on Long values as identity transform on the left input, while ignoring the right.
+	 * As an implementation for the {@link CheckpointNotifier} interface it stores all the checkpoint
+	 * ids it has seen in a static list.
+	 */
+	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
 			implements CheckpointNotifier {
 
 		public static List[] completedCheckpoints = new List[PARALLELISM];
@@ -324,12 +329,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		}
 
 		@Override
-		public void flatMap1(String value, Collector<String> out) throws IOException {
+		public void flatMap1(Long value, Collector<Long> out) throws IOException {
 			out.collect(value);
 		}
 
 		@Override
-		public void flatMap2(String value, Collector<String> out) throws IOException {
+		public void flatMap2(Long value, Collector<Long> out) throws IOException {
 			// we ignore the values from the second input
 		}
 


[2/2] flink git commit: [streaming] [tests] Checkpointing tests refactor & cleanup

Posted by mb...@apache.org.
[streaming] [tests] Checkpointing tests refactor & cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ca7df59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ca7df59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ca7df59

Branch: refs/heads/master
Commit: 4ca7df592538edb6fd69159c7b955673ed3af820
Parents: f50ae26
Author: mbalassi <mb...@apache.org>
Authored: Tue Aug 4 10:11:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 11 14:12:00 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   2 +-
 .../CoStreamCheckpointingITCase.java            | 215 ++++--------
 .../PartitionedStateCheckpointingITCase.java    |  92 ++---
 .../checkpointing/StateCheckpoinedITCase.java   | 158 +++------
 .../StreamCheckpointNotifierITCase.java         | 341 +++++++++++++++++++
 .../StreamCheckpointingITCase.java              | 224 ++++--------
 .../StreamFaultToleranceTestBase.java           | 150 ++++++++
 7 files changed, 693 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 080272b..b20482e 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1355,7 +1355,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
 
 ### Checkpointed interface
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 6197092..258bee8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -22,28 +22,22 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Random;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled. This differs from
@@ -64,44 +58,9 @@ import static org.junit.Assert.*;
  * state reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
-public class CoStreamCheckpointingITCase {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-			cluster = new ForkableFlinkMiniCluster(config, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
-
+public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
+	final long NUM_STRINGS = 10_000_000L;
 
 	/**
 	 * Runs the following program:
@@ -110,102 +69,76 @@ public class CoStreamCheckpointingITCase {
 	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
-	@Test
-	public void runCheckpointedProgram() {
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
 
-		final long NUM_STRINGS = 10000000L;
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
-			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(500);
-			env.getConfig().disableSysoutLogging();
-
-			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
-			stream
-					// -------------- first vertex, chained to the source ----------------
-					.filter(new StringRichFilterFunction())
-
-					// -------------- second vertex - the stateful one that also fails ----------------
-					.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
-
-					// -------------- third vertex - the stateful one that also fails ----------------
-					.map(new StringPrefixCountRichMapFunction())
-					.startNewChain()
-					.map(new StatefulCounterFunction())
-
-							// -------------- fourth vertex - reducer and the sink ----------------
-					.groupBy("prefix")
-					.reduce(new OnceFailingReducer(NUM_STRINGS))
-					.addSink(new RichSinkFunction<PrefixCount>() {
-
-						private Map<Character, Long> counts = new HashMap<Character, Long>();
-
-						@Override
-						public void invoke(PrefixCount value) {
-							Character first = value.prefix.charAt(0);
-							Long previous = counts.get(first);
-							if (previous == null) {
-								counts.put(first, value.count);
-							} else {
-								counts.put(first, Math.max(previous, value.count));
-							}
-						}
-
-//						@Override
-//						public void close() {
-//							for (Long count : counts.values()) {
-//								assertEquals(NUM_STRINGS / 40, count.longValue());
-//							}
-//						}
-					});
-
-			env.execute();
-
-			long filterSum = 0;
-			for (long l : StringRichFilterFunction.counts) {
-				filterSum += l;
-			}
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 
-			long coMapSum = 0;
-			for (long l : LeftIdentityCoRichFlatMapFunction.counts) {
-				coMapSum += l;
-			}
+		stream
+				// -------------- first vertex, chained to the source ----------------
+				.filter(new StringRichFilterFunction())
 
-			long mapSum = 0;
-			for (long l : StringPrefixCountRichMapFunction.counts) {
-				mapSum += l;
-			}
+						// -------------- second vertex - the stateful one that also fails ----------------
+				.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
 
-			long countSum = 0;
-			for (long l : StatefulCounterFunction.counts) {
-				countSum += l;
-			}
+				// -------------- third vertex - the stateful one that also fails ----------------
+				.map(new StringPrefixCountRichMapFunction())
+				.startNewChain()
+				.map(new StatefulCounterFunction())
 
-			if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
-				Assert.fail("Restore was never called on counting Map function.");
-			}
+						// -------------- fourth vertex - reducer and the sink ----------------
+				.groupBy("prefix")
+				.reduce(new OnceFailingReducer(NUM_STRINGS))
+				.addSink(new SinkFunction<PrefixCount>() {
 
-			if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
-				Assert.fail("Restore was never called on counting CoMap function.");
-			}
+					@Override
+					public void invoke(PrefixCount value) throws Exception {
+						// Do nothing here
+					}
+				});
+	}
+
+	@Override
+	public void postSubmit() {
+		long filterSum = 0;
+		for (long l : StringRichFilterFunction.counts) {
+			filterSum += l;
+		}
 
-			// verify that we counted exactly right
+		long coMapSum = 0;
+		for (long l : LeftIdentityCoRichFlatMapFunction.counts) {
+			coMapSum += l;
+		}
+
+		long mapSum = 0;
+		for (long l : StringPrefixCountRichMapFunction.counts) {
+			mapSum += l;
+		}
 
-			assertEquals(NUM_STRINGS, filterSum);
-			assertEquals(NUM_STRINGS, coMapSum);
-			assertEquals(NUM_STRINGS, mapSum);
-			assertEquals(NUM_STRINGS, countSum);
+		long countSum = 0;
+		for (long l : StatefulCounterFunction.counts) {
+			countSum += l;
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+
+		if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
+			Assert.fail("Restore was never called on counting Map function.");
+		}
+
+		if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
+			Assert.fail("Restore was never called on counting CoMap function.");
 		}
+
+		// verify that we counted exactly right
+
+		assertEquals(NUM_STRINGS, filterSum);
+		assertEquals(NUM_STRINGS, coMapSum);
+		assertEquals(NUM_STRINGS, mapSum);
+		assertEquals(NUM_STRINGS, countSum);
 	}
 
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
@@ -260,7 +193,6 @@ public class CoStreamCheckpointingITCase {
 
 				synchronized (lockingObject) {
 					index.update(index.value() + step);
-//					System.out.println("SOURCE EMIT: " + result);
 					ctx.collect(result);
 				}
 			}
@@ -341,30 +273,6 @@ public class CoStreamCheckpointingITCase {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Custom Type Classes
-	// --------------------------------------------------------------------------------------------
-
-	public static class PrefixCount {
-
-		public String prefix;
-		public String value;
-		public long count;
-
-		public PrefixCount() {}
-
-		public PrefixCount(String prefix, String value, long count) {
-			this.prefix = prefix;
-			this.value = value;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return prefix + " / " + value;
-		}
-	}
-
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
 
 		Long count = 0L;
@@ -434,7 +342,6 @@ public class CoStreamCheckpointingITCase {
 		@Override
 		public void flatMap1(String value, Collector<String> out) throws IOException {
 			count += 1;
-//			System.out.println("Co-Map COUNT: " + count);
 
 			out.collect(value);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 88361e2..d942a9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -52,82 +52,36 @@ import org.junit.Test;
  * It is designed to check partitioned states.
  */
 @SuppressWarnings("serial")
-public class PartitionedStateCheckpointingITCase {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-			cluster = new ForkableFlinkMiniCluster(config, false);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
+public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
 
-	@SuppressWarnings("unchecked")
-	@Test
-	public void runCheckpointedProgram() {
+	final long NUM_STRINGS = 10_000_000L;
 
-		final long NUM_STRINGS = 10000000L;
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", (NUM_STRINGS/2) % 40 == 0);
 
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
-					cluster.getJobManagerRPCPort());
-			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(500);
-			env.getConfig().disableSysoutLogging();
-
-			DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
-			DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+		DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+		DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
 
-			stream1.union(stream2)
-					.groupBy(new IdentityKeySelector<Integer>())
-					.map(new OnceFailingPartitionedSum(NUM_STRINGS))
-					.keyBy(0)
-					.addSink(new CounterSink());
-
-			env.execute();
+		stream1.union(stream2)
+				.groupBy(new IdentityKeySelector<Integer>())
+				.map(new OnceFailingPartitionedSum(NUM_STRINGS))
+				.keyBy(0)
+				.addSink(new CounterSink());
+	}
 
-			// verify that we counted exactly right
-			for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
-				assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
-			}
-			System.out.println("new");
-			for (Long count : CounterSink.allCounts.values()) {
-				assertEquals(new Long(NUM_STRINGS / 40), count);
-			}
-			
-			assertEquals(40, CounterSink.allCounts.size());
-			assertEquals(40, OnceFailingPartitionedSum.allSums.size());
-			
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+	@Override
+	public void postSubmit() {
+		// verify that we counted exactly right
+		for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
+			assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
+		}
+		for (Long count : CounterSink.allCounts.values()) {
+			assertEquals(new Long(NUM_STRINGS / 40), count);
 		}
+
+		assertEquals(40, CounterSink.allCounts.size());
+		assertEquals(40, OnceFailingPartitionedSum.allSums.size());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 39ff2e5..072086b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -49,48 +49,15 @@ import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
- * 
+ *
  * The test triggers a failure after a while and verifies that, after completion, the
- * state reflects the "exactly once" semantics.
+ * state defined with either the {@link OperatorState} or the {@link Checkpointed}
+ * interface reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
-public class StateCheckpoinedITCase {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			
-			cluster = new ForkableFlinkMiniCluster(config, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
+public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 
+	final long NUM_STRINGS = 10_000_000L;
 
 	/**
 	 * Runs the following program:
@@ -99,69 +66,56 @@ public class StateCheckpoinedITCase {
 	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
-	@Test
-	public void runCheckpointedProgram() {
-
-		final long NUM_STRINGS = 10000000L;
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-		
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-																	"localhost", cluster.getJobManagerRPCPort());
-			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(500);
-			env.getConfig().disableSysoutLogging();
-
-			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-			
-			stream
-					// -------------- first vertex, chained to the source ----------------
-					.filter(new StringRichFilterFunction())
 
-					// -------------- seconds vertex - one-to-one connected ----------------
-					.map(new StringPrefixCountRichMapFunction())
-					.startNewChain()
-					.map(new StatefulCounterFunction())
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 
-					// -------------- third vertex - reducer and the sink ----------------
-					.partitionByHash("prefix")
-					.flatMap(new OnceFailingAggregator(NUM_STRINGS))
-					.addSink(new ValidatingSink());
+		stream
+				// -------------- first vertex, chained to the source ----------------
+				.filter(new StringRichFilterFunction())
 
-			env.execute();
-			
-			long filterSum = 0;
-			for (long l : StringRichFilterFunction.counts) {
-				filterSum += l;
-			}
+						// -------------- seconds vertex - one-to-one connected ----------------
+				.map(new StringPrefixCountRichMapFunction())
+				.startNewChain()
+				.map(new StatefulCounterFunction())
 
-			long mapSum = 0;
-			for (long l : StringPrefixCountRichMapFunction.counts) {
-				mapSum += l;
-			}
+						// -------------- third vertex - reducer and the sink ----------------
+				.partitionByHash("prefix")
+				.flatMap(new OnceFailingAggregator(NUM_STRINGS))
+				.addSink(new ValidatingSink());
+	}
 
-			long countSum = 0;
-			for (long l : StatefulCounterFunction.counts) {
-				countSum += l;
-			}
+	@Override
+	public void postSubmit() {
+		long filterSum = 0;
+		for (long l : StringRichFilterFunction.counts) {
+			filterSum += l;
+		}
+
+		long mapSum = 0;
+		for (long l : StringPrefixCountRichMapFunction.counts) {
+			mapSum += l;
+		}
 
-			// verify that we counted exactly right
-			assertEquals(NUM_STRINGS, filterSum);
-			assertEquals(NUM_STRINGS, mapSum);
-			assertEquals(NUM_STRINGS, countSum);
+		long countSum = 0;
+		for (long l : StatefulCounterFunction.counts) {
+			countSum += l;
+		}
 
-			for (Map<Character, Long> map : ValidatingSink.maps) {
-				for (Long count : map.values()) {
-					assertEquals(NUM_STRINGS / 40, count.longValue());
-				}
+		// verify that we counted exactly right
+		assertEquals(NUM_STRINGS, filterSum);
+		assertEquals(NUM_STRINGS, mapSum);
+		assertEquals(NUM_STRINGS, countSum);
+
+		for (Map<Character, Long> map : ValidatingSink.maps) {
+			for (Long count : map.values()) {
+				assertEquals(NUM_STRINGS / 40, count.longValue());
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
@@ -406,28 +360,4 @@ public class StateCheckpoinedITCase {
 			counts.putAll(state);
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Type Classes
-	// --------------------------------------------------------------------------------------------
-
-	public static class PrefixCount implements Serializable {
-
-		public String prefix;
-		public String value;
-		public long count;
-
-		public PrefixCount() {}
-
-		public PrefixCount(String prefix, String value, long count) {
-			this.prefix = prefix;
-			this.value = value;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return prefix + " / " + value;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
new file mode 100644
index 0000000..4c1ac4e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration test for the {@link CheckpointNotifier} interface. The test ensures that
+ * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for some completed
+ * checkpoints, that it is called at most once for any checkpoint id and that it is not
+ * called for a deliberately failed checkpoint.
+ *
+ * <p>
+ * Note that as a result of doing the checks on the task level there is no way to verify
+ * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every
+ * successfully completed checkpoint.
+ */
+@SuppressWarnings("serial")
+public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 10_000_000L;
+
+	/**
+	 * Runs the following program:
+	 *
+	 * <pre>
+	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 * </pre>
+	 */
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+		stream
+				// -------------- first vertex, chained to the src ----------------
+				.filter(new StringRichFilterFunction())
+
+						// -------------- second vertex, applying the co-map ----------------
+				.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
+
+				// -------------- third vertex - the stateful one that also fails ----------------
+				.map(new StringPrefixCountRichMapFunction())
+				.startNewChain()
+				.map(new IdentityMapFunction())
+
+						// -------------- fourth vertex - reducer and the sink ----------------
+				.groupBy("prefix")
+				.reduce(new OnceFailingReducer(NUM_STRINGS))
+				.addSink(new SinkFunction<PrefixCount>() {
+					@Override
+					public void invoke(PrefixCount value) {
+						// do nothing
+					}
+				});
+	}
+
+	@Override
+	public void postSubmit() {
+		List[][] checkList = new List[][]{	StringGeneratingSourceFunction.completedCheckpoints,
+				IdentityMapFunction.completedCheckpoints,
+				StringPrefixCountRichMapFunction.completedCheckpoints,
+				LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
+
+		for(List[] parallelNotifications : checkList) {
+			for (int i = 0; i < PARALLELISM; i++){
+				List<Long> notifications = parallelNotifications[i];
+				assertTrue("No checkpoint notification was received.",
+						notifications.size() > 0);
+				assertFalse("Failure checkpoint was marked as completed.",
+						notifications.contains(OnceFailingReducer.failureCheckpointID));
+				assertTrue("Checkpoint notification was received multiple times",
+						notifications.size() == new HashSet<Long>(notifications).size());
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
+			implements  ParallelSourceFunction<String>, CheckpointNotifier {
+
+		// operator life cycle
+		private volatile boolean isRunning;
+
+		// operator behaviour
+		private final long numElements;
+		private Random rnd;
+
+		private StringBuilder stringBuilder;
+		private OperatorState<Integer> index;
+		private int step;
+
+		// test behaviour
+		private int subtaskId;
+		public static List[] completedCheckpoints = new List[PARALLELISM];
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			rnd = new Random();
+			stringBuilder = new StringBuilder();
+			step = getRuntimeContext().getNumberOfParallelSubtasks();
+			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+			index = getRuntimeContext().getOperatorState("index", subtaskId, false);
+
+			// Create a collection on the first open
+			if (completedCheckpoints[subtaskId] == null) {
+				completedCheckpoints[subtaskId] = new ArrayList();
+			}
+
+			isRunning = true;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			while (isRunning && index.value() < numElements) {
+				char first = (char) ((index.value() % 40) + 40);
+
+				stringBuilder.setLength(0);
+				stringBuilder.append(first);
+
+				String result = randomString(stringBuilder, rnd);
+
+				synchronized (lockingObject) {
+					index.update(index.value() + step);
+					ctx.collect(result);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			completedCheckpoints[subtaskId].add(checkpointId);
+		}
+	}
+
+	private static class IdentityMapFunction extends RichMapFunction<PrefixCount, PrefixCount>
+			implements CheckpointNotifier {
+
+		public static List[] completedCheckpoints = new List[PARALLELISM];
+		private int subtaskId;
+
+		@Override
+		public PrefixCount map(PrefixCount value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public void open(Configuration conf) throws IOException {
+			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+			// Create a collection on the first open
+			if (completedCheckpoints[subtaskId] == null) {
+				completedCheckpoints[subtaskId] = new ArrayList();
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			completedCheckpoints[subtaskId].add(checkpointId);
+		}
+	}
+
+	private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> implements Checkpointed<Long>{
+
+		private static volatile boolean hasFailed = false;
+		public static volatile long failureCheckpointID;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingReducer(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
+			count++;
+			value1.count += value2.count;
+			return value1;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				failureCheckpointID = checkpointId;
+				throw new Exception("Test Failure");
+			}
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+
+	private static class StringRichFilterFunction implements FilterFunction<String> {
+		@Override
+		public boolean filter(String value) {
+			return value.length() < 100;
+		}
+	}
+
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+			implements CheckpointNotifier {
+
+		public static List[] completedCheckpoints = new List[PARALLELISM];
+		private int subtaskId;
+
+		@Override
+		public void open(Configuration conf) throws IOException {
+			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+			// Create a collection on the first open
+			if (completedCheckpoints[subtaskId] == null) {
+				completedCheckpoints[subtaskId] = new ArrayList();
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			completedCheckpoints[subtaskId].add(checkpointId);
+		}
+
+		@Override
+		public PrefixCount map(String value) throws IOException {
+			return new PrefixCount(value.substring(0, 1), value, 1L);
+		}
+	}
+
+	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String>
+			implements CheckpointNotifier {
+
+		public static List[] completedCheckpoints = new List[PARALLELISM];
+		private int subtaskId;
+
+		@Override
+		public void open(Configuration conf) throws IOException {
+			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+			// Create a collection on the first open
+			if (completedCheckpoints[subtaskId] == null) {
+				completedCheckpoints[subtaskId] = new ArrayList();
+			}
+		}
+
+		@Override
+		public void flatMap1(String value, Collector<String> out) throws IOException {
+			out.collect(value);
+		}
+
+		@Override
+		public void flatMap2(String value, Collector<String> out) throws IOException {
+			// we ignore the values from the second input
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			completedCheckpoints[subtaskId].add(checkpointId);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 93dda5f..d54d425 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,21 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -40,56 +28,25 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
  * 
  * The test triggers a failure after a while and verifies that, after completion, the
- * state reflects the "exactly once" semantics.
+ * state defined with the {@link Checkpointed} interface reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointingITCase {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			
-			cluster = new ForkableFlinkMiniCluster(config, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
+public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
+	final long NUM_STRINGS = 10_000_000L;
 
 	/**
 	 * Runs the following program:
@@ -98,78 +55,63 @@ public class StreamCheckpointingITCase {
 	 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 	 * </pre>
 	 */
-	@Test
-	public void runCheckpointedProgram() {
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+		stream
+				// -------------- first vertex, chained to the source ----------------
+				.filter(new StringRichFilterFunction()).shuffle()
+
+				// -------------- seconds vertex - the stateful one that also fails ----------------
+				.map(new StringPrefixCountRichMapFunction())
+				.startNewChain()
+				.map(new StatefulCounterFunction())
+
+						// -------------- third vertex - counter and the sink ----------------
+				.groupBy("prefix")
+				.map(new OnceFailingPrefixCounter(NUM_STRINGS))
+				.addSink(new SinkFunction<PrefixCount>() {
+
+					@Override
+					public void invoke(PrefixCount value) throws Exception {
+						// Do nothing here
+					}
+				});
+	}
 
-		final long NUM_STRINGS = 10000000L;
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-		
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-																	"localhost", cluster.getJobManagerRPCPort());
-			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(500);
-			env.getConfig().disableSysoutLogging();
-
-			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-			
-			stream
-					// -------------- first vertex, chained to the source ----------------
-					.filter(new StringRichFilterFunction()).shuffle()
-
-					// -------------- seconds vertex - the stateful one that also fails ----------------
-					.map(new StringPrefixCountRichMapFunction())
-					.startNewChain()
-					.map(new StatefulCounterFunction())
-
-					// -------------- third vertex - counter and the sink ----------------
-					.groupBy("prefix")
-					.map(new OnceFailingPrefixCounter(NUM_STRINGS))
-					.addSink(new SinkFunction<PrefixCount>() {
-
-						@Override
-						public void invoke(PrefixCount value) throws Exception {
-							// Do nothing here
-						}
-					});
-
-			env.execute();
-			
-			long filterSum = 0;
-			for (long l : StringRichFilterFunction.counts) {
-				filterSum += l;
-			}
+	@Override
+	public void postSubmit() {
+		long filterSum = 0;
+		for (long l : StringRichFilterFunction.counts) {
+			filterSum += l;
+		}
 
-			long mapSum = 0;
-			for (long l : StringPrefixCountRichMapFunction.counts) {
-				mapSum += l;
-			}
+		long mapSum = 0;
+		for (long l : StringPrefixCountRichMapFunction.counts) {
+			mapSum += l;
+		}
 
-			long countSum = 0;
-			for (long l : StatefulCounterFunction.counts) {
-				countSum += l;
-			}
-			
-			long reduceInputCount = 0;
-			for(long l: OnceFailingPrefixCounter.counts){
-				reduceInputCount += l;
-			}
-			
-			assertEquals(NUM_STRINGS, filterSum);
-			assertEquals(NUM_STRINGS, mapSum);
-			assertEquals(NUM_STRINGS, countSum);
-			assertEquals(NUM_STRINGS, reduceInputCount);
-			// verify that we counted exactly right
-			for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
-				assertEquals(new Long(NUM_STRINGS / 40), count);
-			}
+		long countSum = 0;
+		for (long l : StatefulCounterFunction.counts) {
+			countSum += l;
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+
+		long reduceInputCount = 0;
+		for(long l: OnceFailingPrefixCounter.counts){
+			reduceInputCount += l;
+		}
+
+		assertEquals(NUM_STRINGS, filterSum);
+		assertEquals(NUM_STRINGS, mapSum);
+		assertEquals(NUM_STRINGS, countSum);
+		assertEquals(NUM_STRINGS, reduceInputCount);
+		// verify that we counted exactly right
+		for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
+			assertEquals(new Long(NUM_STRINGS / 40), count);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
@@ -246,27 +188,31 @@ public class StreamCheckpointingITCase {
 		}
 	}
 	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
 
-		private OperatorState<Long> count;
+		private long count;
 		static final long[] counts = new long[PARALLELISM];
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
-			count.update(count.value() + 1);
+			count++;
 			return value;
 		}
 
 		@Override
-		public void open(Configuration conf) throws IOException {
-			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 
 		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
 		}
-		
 	}
 	
 	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> {
@@ -320,30 +266,6 @@ public class StreamCheckpointingITCase {
 			return value;
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Type Classes
-	// --------------------------------------------------------------------------------------------
-
-	public static class PrefixCount implements Serializable {
-
-		public String prefix;
-		public String value;
-		public long count;
-
-		public PrefixCount() {}
-
-		public PrefixCount(String prefix, String value, long count) {
-			this.prefix = prefix;
-			this.value = value;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return prefix + " / " + value;
-		}
-	}
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
new file mode 100644
index 0000000..2993315
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Test base for fault tolerant streaming programs
+ */
+@SuppressWarnings("serial")
+public abstract class StreamFaultToleranceTestBase {
+
+	protected static final int NUM_TASK_MANAGERS = 2;
+	protected static final int NUM_TASK_SLOTS = 3;
+	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * Implementations are expected to assemble the test topology in this function
+	 * using the provided {@link StreamExecutionEnvironment}.
+	 */
+	abstract public void testProgram(StreamExecutionEnvironment env);
+
+	/**
+	 * Implementations are expected to provide test here to verify the correct behavior.
+	 */
+	abstract public void postSubmit();
+
+	/**
+	 * Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}
+	 * followed by the checks in {@link #postSubmit}.
+	 */
+	@Test
+	public void runCheckpointedProgram() {
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+			env.setParallelism(PARALLELISM);
+			env.enableCheckpointing(500);
+			env.getConfig().disableSysoutLogging();
+
+			testProgram(env);
+
+			env.execute();
+
+			postSubmit();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Frequently used utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static class PrefixCount implements Serializable {
+
+		public String prefix;
+		public String value;
+		public long count;
+
+		public PrefixCount() {}
+
+		public PrefixCount(String prefix, String value, long count) {
+			this.prefix = prefix;
+			this.value = value;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return prefix + " / " + value;
+		}
+	}
+
+}