You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:53 UTC

[02/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
deleted file mode 100644
index f517f83..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after completion, the
- * state defined with either the {@link OperatorState} or the {@link Checkpointed}
- * interface reflects the "exactly once" semantics.
- * 
- * The test throttles the input until at least two checkpoints are completed, to make sure that
- * the recovery does not fall back to "square one" (which would naturally lead to correct
- * results without testing the checkpointing).
- */
-@SuppressWarnings("serial")
-public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StateCheckpoinedITCase.class);
-
-	final long NUM_STRINGS = 10_000_000L;
-
-	/**
-	 * Runs the following program:
-	 *
-	 * <pre>
-	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
-	 * </pre>
-	 */
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
-		final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
-
-		final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-
-		env.enableCheckpointing(200);
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
-		stream
-				// first vertex, chained to the source
-				// this filter throttles the flow until at least one checkpoint
-				// is complete, to make sure this program does not run without 
-				.filter(new StringRichFilterFunction())
-
-						// -------------- seconds vertex - one-to-one connected ----------------
-				.map(new StringPrefixCountRichMapFunction())
-				.startNewChain()
-				.map(new StatefulCounterFunction())
-
-						// -------------- third vertex - reducer and the sink ----------------
-				.partitionByHash("prefix")
-				.flatMap(new OnceFailingAggregator(failurePos))
-				.addSink(new ValidatingSink());
-	}
-
-	@Override
-	public void postSubmit() {
-		
-		//assertTrue("Test inconclusive: failure occurred before first checkpoint",
-		//		OnceFailingAggregator.wasCheckpointedBeforeFailure);
-		if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
-			LOG.warn("Test inconclusive: failure occurred before first checkpoint");
-		}
-		
-		long filterSum = 0;
-		for (long l : StringRichFilterFunction.counts) {
-			filterSum += l;
-		}
-
-		long mapSum = 0;
-		for (long l : StringPrefixCountRichMapFunction.counts) {
-			mapSum += l;
-		}
-
-		long countSum = 0;
-		for (long l : StatefulCounterFunction.counts) {
-			countSum += l;
-		}
-
-		// verify that we counted exactly right
-		assertEquals(NUM_STRINGS, filterSum);
-		assertEquals(NUM_STRINGS, mapSum);
-		assertEquals(NUM_STRINGS, countSum);
-
-		for (Map<Character, Long> map : ValidatingSink.maps) {
-			for (Long count : map.values()) {
-				assertEquals(NUM_STRINGS / 40, count.longValue());
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom Functions
-	// --------------------------------------------------------------------------------------------
-	
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> 
-			implements CheckpointedAsynchronously<Integer>
-	{
-		private final long numElements;
-
-		private int index;
-
-		private volatile boolean isRunning = true;
-		
-		
-		StringGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			final Random rnd = new Random();
-			final StringBuilder stringBuilder = new StringBuilder();
-			
-			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-			
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			while (isRunning && index < numElements) {
-				char first = (char) ((index % 40) + 40);
-
-				stringBuilder.setLength(0);
-				stringBuilder.append(first);
-
-				String result = randomString(stringBuilder, rnd);
-
-				synchronized (lockingObject) {
-					index += step;
-					ctx.collect(result);
-				}
-			}
-		}
-		
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
-	}
-
-	private static class StringRichFilterFunction extends RichFilterFunction<String> 
-			implements Checkpointed<Long> {
-
-		static final long[] counts = new long[PARALLELISM];
-		
-		private long count;
-		
-		@Override
-		public boolean filter(String value) throws Exception {
-			count++;
-			return value.length() < 100; // should be always true
-		}
-
-		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
-		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
-		}
-
-		@Override
-		public void restoreState(Long state) {
-			count = state;
-		}
-	}
-
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> 
-			implements CheckpointedAsynchronously<Long> {
-		
-		static final long[] counts = new long[PARALLELISM];
-
-		private long count;
-		
-		@Override
-		public PrefixCount map(String value) {
-			count++;
-			return new PrefixCount(value.substring(0, 1), value, 1L);
-		}
-
-		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
-		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
-		}
-
-		@Override
-		public void restoreState(Long state) {
-			count = state;
-		}
-	}
-	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<Long> {
-
-		static final long[] counts = new long[PARALLELISM];
-		
-		private long count;
-
-		@Override
-		public PrefixCount map(PrefixCount value) throws Exception {
-			count++;
-			return value;
-		}
-
-		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
-		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
-		}
-
-		@Override
-		public void restoreState(Long state) {
-			count = state;
-		}
-	}
-	
-	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
-
-		static boolean wasCheckpointedBeforeFailure = false;
-		
-		private static volatile boolean hasFailed = false;
-
-		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
-		
-		private long failurePos;
-		private long count;
-		
-		private boolean wasCheckpointed;
-		
-
-		OnceFailingAggregator(long failurePos) {
-			this.failurePos = failurePos;
-		}
-		
-		@Override
-		public void open(Configuration parameters) {
-			count = 0;
-		}
-
-		@Override
-		public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
-			count++;
-			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
-				wasCheckpointedBeforeFailure = wasCheckpointed;
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-			
-			PrefixCount curr = aggregationMap.get(value.prefix);
-			if (curr == null) {
-				aggregationMap.put(value.prefix, value);
-				out.collect(value);
-			}
-			else {
-				curr.count += value.count;
-				out.collect(curr);
-			}
-		}
-
-		@Override
-		public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return aggregationMap;
-		}
-
-		@Override
-		public void restoreState(HashMap<String, PrefixCount> state) {
-			aggregationMap.putAll(state);
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			this.wasCheckpointed = true;
-		}
-	}
-
-	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 
-			implements Checkpointed<HashMap<Character, Long>> {
-
-		@SuppressWarnings("unchecked")
-		private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
-		
-		private HashMap<Character, Long> counts = new HashMap<Character, Long>();
-
-		@Override
-		public void invoke(PrefixCount value) {
-			Character first = value.prefix.charAt(0);
-			Long previous = counts.get(first);
-			if (previous == null) {
-				counts.put(first, value.count);
-			} else {
-				counts.put(first, Math.max(previous, value.count));
-			}
-		}
-
-		@Override
-		public void close() throws Exception {
-			maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
-		}
-
-		@Override
-		public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return counts;
-		}
-
-		@Override
-		public void restoreState(HashMap<Character, Long> state) {
-			counts.putAll(state);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
new file mode 100644
index 0000000..d7c06f6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state defined with either the {@link OperatorState} or the {@link Checkpointed}
+ * interface reflects the "exactly once" semantics.
+ * 
+ * The test throttles the input until at least two checkpoints are completed, to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead to correct
+ * results without testing the checkpointing).
+ */
+@SuppressWarnings("serial")
+public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
+
+	final long NUM_STRINGS = 10_000_000L;
+
+	/**
+	 * Runs the following program:
+	 *
+	 * <pre>
+	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 * </pre>
+	 */
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
+		final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
+
+		final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+
+		env.enableCheckpointing(200);
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+		stream
+				// first vertex, chained to the source
+				// this filter throttles the flow until at least one checkpoint
+				// is complete, to make sure this program does not run without 
+				.filter(new StringRichFilterFunction())
+
+						// -------------- seconds vertex - one-to-one connected ----------------
+				.map(new StringPrefixCountRichMapFunction())
+				.startNewChain()
+				.map(new StatefulCounterFunction())
+
+						// -------------- third vertex - reducer and the sink ----------------
+				.partitionByHash("prefix")
+				.flatMap(new OnceFailingAggregator(failurePos))
+				.addSink(new ValidatingSink());
+	}
+
+	@Override
+	public void postSubmit() {
+		
+		//assertTrue("Test inconclusive: failure occurred before first checkpoint",
+		//		OnceFailingAggregator.wasCheckpointedBeforeFailure);
+		if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+			LOG.warn("Test inconclusive: failure occurred before first checkpoint");
+		}
+		
+		long filterSum = 0;
+		for (long l : StringRichFilterFunction.counts) {
+			filterSum += l;
+		}
+
+		long mapSum = 0;
+		for (long l : StringPrefixCountRichMapFunction.counts) {
+			mapSum += l;
+		}
+
+		long countSum = 0;
+		for (long l : StatefulCounterFunction.counts) {
+			countSum += l;
+		}
+
+		// verify that we counted exactly right
+		assertEquals(NUM_STRINGS, filterSum);
+		assertEquals(NUM_STRINGS, mapSum);
+		assertEquals(NUM_STRINGS, countSum);
+
+		for (Map<Character, Long> map : ValidatingSink.maps) {
+			for (Long count : map.values()) {
+				assertEquals(NUM_STRINGS / 40, count.longValue());
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+	
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> 
+			implements CheckpointedAsynchronously<Integer>
+	{
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+		
+		
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final Random rnd = new Random();
+			final StringBuilder stringBuilder = new StringBuilder();
+			
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+			
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+				char first = (char) ((index % 40) + 40);
+
+				stringBuilder.setLength(0);
+				stringBuilder.append(first);
+
+				String result = randomString(stringBuilder, rnd);
+
+				synchronized (lockingObject) {
+					index += step;
+					ctx.collect(result);
+				}
+			}
+		}
+		
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+
+	private static class StringRichFilterFunction extends RichFilterFunction<String> 
+			implements Checkpointed<Long> {
+
+		static final long[] counts = new long[PARALLELISM];
+		
+		private long count;
+		
+		@Override
+		public boolean filter(String value) throws Exception {
+			count++;
+			return value.length() < 100; // should be always true
+		}
+
+		@Override
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> 
+			implements CheckpointedAsynchronously<Long> {
+		
+		static final long[] counts = new long[PARALLELISM];
+
+		private long count;
+		
+		@Override
+		public PrefixCount map(String value) {
+			count++;
+			return new PrefixCount(value.substring(0, 1), value, 1L);
+		}
+
+		@Override
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+	
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
+		implements Checkpointed<Long> {
+
+		static final long[] counts = new long[PARALLELISM];
+		
+		private long count;
+
+		@Override
+		public PrefixCount map(PrefixCount value) throws Exception {
+			count++;
+			return value;
+		}
+
+		@Override
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+	
+	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
+		implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
+
+		static boolean wasCheckpointedBeforeFailure = false;
+		
+		private static volatile boolean hasFailed = false;
+
+		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
+		
+		private long failurePos;
+		private long count;
+		
+		private boolean wasCheckpointed;
+		
+
+		OnceFailingAggregator(long failurePos) {
+			this.failurePos = failurePos;
+		}
+		
+		@Override
+		public void open(Configuration parameters) {
+			count = 0;
+		}
+
+		@Override
+		public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				wasCheckpointedBeforeFailure = wasCheckpointed;
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+			
+			PrefixCount curr = aggregationMap.get(value.prefix);
+			if (curr == null) {
+				aggregationMap.put(value.prefix, value);
+				out.collect(value);
+			}
+			else {
+				curr.count += value.count;
+				out.collect(curr);
+			}
+		}
+
+		@Override
+		public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return aggregationMap;
+		}
+
+		@Override
+		public void restoreState(HashMap<String, PrefixCount> state) {
+			aggregationMap.putAll(state);
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			this.wasCheckpointed = true;
+		}
+	}
+
+	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 
+			implements Checkpointed<HashMap<Character, Long>> {
+
+		@SuppressWarnings("unchecked")
+		private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
+		
+		private HashMap<Character, Long> counts = new HashMap<Character, Long>();
+
+		@Override
+		public void invoke(PrefixCount value) {
+			Character first = value.prefix.charAt(0);
+			Long previous = counts.get(first);
+			if (previous == null) {
+				counts.put(first, value.count);
+			} else {
+				counts.put(first, Math.max(previous, value.count));
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
+		}
+
+		@Override
+		public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return counts;
+		}
+
+		@Override
+		public void restoreState(HashMap<Character, Long> state) {
+			counts.putAll(state);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
new file mode 100644
index 0000000..ba5ff1c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.createTempDirectory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Verify behaviour in case of JobManager process failure during job execution.
+ *
+ * <p>The test works with multiple job managers processes by spawning JVMs.
+ *
+ * <p>Initially, it starts two TaskManager (2 slots each) and two JobManager JVMs.
+ *
+ * <p>It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the existence of
+ * temporary files. It then kills the leading JobManager process. The recovery should restart the
+ * tasks on the new JobManager.
+ *
+ * <p>This follows the same structure as {@link AbstractTaskManagerProcessFailureRecoveryTest}.
+ */
+public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends TestLogger {
+
+	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	private static final File FileStateBackendBasePath;
+
+	static {
+		try {
+			FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error in test setup. Could not create directory.", e);
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (ZooKeeper != null) {
+			ZooKeeper.shutdown();
+		}
+
+		if (FileStateBackendBasePath != null) {
+			FileUtils.deleteDirectory(FileStateBackendBasePath);
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+
+		FileUtils.cleanDirectory(FileStateBackendBasePath);
+	}
+
+	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+	protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+	protected static final String PROCEED_MARKER_FILE = "proceed";
+
+	protected static final int PARALLELISM = 4;
+
+	/**
+	 * Test program with JobManager failure.
+	 *
+	 * @param zkQuorum ZooKeeper quorum to connect to
+	 * @param coordinateDir Coordination directory
+	 * @throws Exception
+	 */
+	public abstract void testJobManagerFailure(String zkQuorum, File coordinateDir) throws Exception;
+
+	@Test
+	public void testJobManagerProcessFailure() throws Exception {
+		// Config
+		final int numberOfJobManagers = 2;
+		final int numberOfTaskManagers = 2;
+		final int numberOfSlotsPerTaskManager = 2;
+
+		assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
+
+		// Setup
+		// Test actor system
+		ActorSystem testActorSystem;
+
+		// Job managers
+		final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+
+		// Task managers
+		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+
+		// Leader election service
+		LeaderRetrievalService leaderRetrievalService = null;
+
+		// Coordination between the processes goes through a directory
+		File coordinateTempDir = null;
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Coordination directory
+			coordinateTempDir = createTempDirectory();
+
+			// Job Managers
+			Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+					ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+			// Start first process
+			jmProcess[0] = new JobManagerProcess(0, config);
+			jmProcess[0].createAndStart();
+
+			// Task manager configuration
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+			// Start the task manager process
+			for (int i = 0; i < numberOfTaskManagers; i++) {
+				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+				TaskManager.startTaskManagerComponentsAndActor(
+						config, tmActorSystem[i], "localhost",
+						Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
+						false, StreamingMode.STREAMING, TaskManager.class);
+			}
+
+			// Test actor system
+			testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+
+			jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
+
+			// Leader listener
+			TestingListener leaderListener = new TestingListener();
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService.start(leaderListener);
+
+			// Initial submission
+			leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+			String leaderAddress = leaderListener.getAddress();
+			UUID leaderId = leaderListener.getLeaderSessionID();
+
+			// Get the leader ref
+			ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
+			ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+
+			// Wait for all task managers to connect to the leading job manager
+			JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
+					deadline.timeLeft());
+
+			final File coordinateDirClosure = coordinateTempDir;
+			final Throwable[] errorRef = new Throwable[1];
+
+			// we trigger program execution in a separate thread
+			Thread programTrigger = new Thread("Program Trigger") {
+				@Override
+				public void run() {
+					try {
+						testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure);
+					}
+					catch (Throwable t) {
+						t.printStackTrace();
+						errorRef[0] = t;
+					}
+				}
+			};
+
+			//start the test program
+			programTrigger.start();
+
+			// wait until all marker files are in place, indicating that all tasks have started
+			AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+					READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
+
+			// Kill one of the job managers and trigger recovery
+			jmProcess[0].destroy();
+
+			jmProcess[1] = new JobManagerProcess(1, config);
+			jmProcess[1].createAndStart();
+
+			jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+
+			// we create the marker file which signals the program functions tasks that they can complete
+			AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+			programTrigger.join(deadline.timeLeft().toMillis());
+
+			// We wait for the finish marker file. We don't wait for the program trigger, because
+			// we submit in detached mode.
+			AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+					FINISH_MARKER_FILE_PREFIX, 1, deadline.timeLeft().toMillis());
+
+			// check that the program really finished
+			assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+			// check whether the program encountered an error
+			if (errorRef[0] != null) {
+				Throwable error = errorRef[0];
+				error.printStackTrace();
+				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+
+			for (JobManagerProcess p : jmProcess) {
+				if (p != null) {
+					p.printProcessLog();
+				}
+			}
+
+			fail(e.getMessage());
+		}
+		finally {
+			for (int i = 0; i < numberOfTaskManagers; i++) {
+				if (tmActorSystem[i] != null) {
+					tmActorSystem[i].shutdown();
+				}
+			}
+
+			if (leaderRetrievalService != null) {
+				leaderRetrievalService.stop();
+			}
+
+			for (JobManagerProcess jmProces : jmProcess) {
+				if (jmProces != null) {
+					jmProces.destroy();
+				}
+			}
+
+			// Delete coordination directory
+			if (coordinateTempDir != null) {
+				try {
+					FileUtils.deleteDirectory(coordinateTempDir);
+				}
+				catch (Throwable ignored) {
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
deleted file mode 100644
index 7e16baf..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * Abstract base for tests verifying the behavior of the recovery in the
- * case when a TaskManager fails (process is killed) in the middle of a job execution.
- *
- * The test works with multiple task managers processes by spawning JVMs.
- * Initially, it starts a JobManager in process and two TaskManagers JVMs with
- * 2 task slots each.
- * It submits a program with parallelism 4 and waits until all tasks are brought up.
- * Coordination between the test and the tasks happens via checking for the
- * existence of temporary files. It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the new TaskManager.
- */
-public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
-
-	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
-	protected static final String PROCEED_MARKER_FILE = "proceed";
-
-	protected static final int PARALLELISM = 4;
-
-	@Test
-	public void testTaskManagerProcessFailure() {
-
-		final StringWriter processOutput1 = new StringWriter();
-		final StringWriter processOutput2 = new StringWriter();
-		final StringWriter processOutput3 = new StringWriter();
-
-		ActorSystem jmActorSystem = null;
-		Process taskManagerProcess1 = null;
-		Process taskManagerProcess2 = null;
-		Process taskManagerProcess3 = null;
-
-		File coordinateTempDir = null;
-
-		try {
-			// check that we run this test only if the java command
-			// is available on this machine
-			String javaCommand = getJavaCommandPath();
-			if (javaCommand == null) {
-				System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
-				return;
-			}
-
-			// create a logging file for the process
-			File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
-			tempLogFile.deleteOnExit();
-			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-			// coordination between the processes goes through a directory
-			coordinateTempDir = createTempDirectory();
-
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
-			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
-			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				jmConfig,
-				jmActorSystem,
-				StreamingMode.STREAMING,
-				JobManager.class,
-				MemoryArchivist.class)._1();
-
-			// the TaskManager java command
-			String[] command = new String[] {
-					javaCommand,
-					"-Dlog.level=DEBUG",
-					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
-					"-Xms80m", "-Xmx80m",
-					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
-					String.valueOf(jobManagerPort)
-			};
-
-			// start the first two TaskManager processes
-			taskManagerProcess1 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
-			taskManagerProcess2 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
-
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
-			// the program will set a marker file in each of its parallel tasks once they are ready, so that
-			// this coordinating code is aware of this.
-			// the program will very slowly consume elements until the marker file (later created by the
-			// test driver code) is present
-			final File coordinateDirClosure = coordinateTempDir;
-			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-			// we trigger program execution in a separate thread
-			Thread programTrigger = new Thread("Program Trigger") {
-				@Override
-				public void run() {
-					try {
-						testProgram(jobManagerPort, coordinateDirClosure);
-					}
-					catch (Throwable t) {
-						t.printStackTrace();
-						errorRef.set(t);
-					}
-				}
-			};
-
-			//start the test program
-			programTrigger.start();
-
-			// wait until all marker files are in place, indicating that all tasks have started
-			// max 20 seconds
-			if (!waitForMarkerFiles(coordinateTempDir, PARALLELISM, 120000)) {
-				// check if the program failed for some reason
-				if (errorRef.get() != null) {
-					Throwable error = errorRef.get();
-					error.printStackTrace();
-					fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
-				}
-				else {
-					// no error occurred, simply a timeout
-					fail("The tasks were not started within time (" + 120000 + "msecs)");
-				}
-			}
-
-			// start the third TaskManager
-			taskManagerProcess3 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
-
-			// we wait for the third TaskManager to register
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
-			// kill one of the previous TaskManagers, triggering a failure and recovery
-			taskManagerProcess1.destroy();
-			taskManagerProcess1 = null;
-
-			// we create the marker file which signals the program functions tasks that they can complete
-			touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
-
-			// wait for at most 5 minutes for the program to complete
-			programTrigger.join(300000);
-
-			// check that the program really finished
-			assertFalse("The program did not finish in time", programTrigger.isAlive());
-
-			// check whether the program encountered an error
-			if (errorRef.get() != null) {
-				Throwable error = errorRef.get();
-				error.printStackTrace();
-				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
-			}
-
-			// all seems well :-)
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			printProcessLog("TaskManager 1", processOutput1.toString());
-			printProcessLog("TaskManager 2", processOutput2.toString());
-			printProcessLog("TaskManager 3", processOutput3.toString());
-			fail(e.getMessage());
-		}
-		catch (Error e) {
-			e.printStackTrace();
-			printProcessLog("TaskManager 1", processOutput1.toString());
-			printProcessLog("TaskManager 2", processOutput2.toString());
-			printProcessLog("TaskManager 3", processOutput3.toString());
-			throw e;
-		}
-		finally {
-			if (taskManagerProcess1 != null) {
-				taskManagerProcess1.destroy();
-			}
-			if (taskManagerProcess2 != null) {
-				taskManagerProcess2.destroy();
-			}
-			if (taskManagerProcess3 != null) {
-				taskManagerProcess3.destroy();
-			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-			if (coordinateTempDir != null) {
-				try {
-					FileUtils.deleteDirectory(coordinateTempDir);
-				}
-				catch (Throwable t) {
-					// we can ignore this
-				}
-			}
-		}
-	}
-
-	/**
-	 * The test program should be implemented here in a form of a separate thread.
-	 * This provides a solution for checking that it has been terminated.
-	 *
-	 * @param jobManagerPort The port for submitting the topology to the local cluster
-	 * @param coordinateDir TaskManager failure will be triggered only after processes
-	 *                             have successfully created file under this directory
-	 */
-	public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception;
-
-
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception
-	{
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
-	}
-
-	protected static void printProcessLog(String processName, String log) {
-		if (log == null || log.length() == 0) {
-			return;
-		}
-
-		System.out.println("-----------------------------------------");
-		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
-		System.out.println("-----------------------------------------");
-		System.out.println(log);
-		System.out.println("-----------------------------------------");
-		System.out.println("		END SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-	}
-
-	protected static File createTempDirectory() throws IOException {
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-		for (int i = 0; i < 10; i++) {
-			File dir = new File(tempDir, UUID.randomUUID().toString());
-			if (!dir.exists() && dir.mkdirs()) {
-				return dir;
-			}
-			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
-		}
-
-		throw new IOException("Could not create temporary file directory");
-	}
-
-	protected static void touchFile(File file) throws IOException {
-		if (!file.exists()) {
-			new FileOutputStream(file).close();
-		}
-		if (!file.setLastModified(System.currentTimeMillis())) {
-			throw new IOException("Could not touch the file.");
-		}
-	}
-
-	protected static boolean waitForMarkerFiles(File basedir, int num, long timeout) {
-		long now = System.currentTimeMillis();
-		final long deadline = now + timeout;
-
-
-		while (now < deadline) {
-			boolean allFound = true;
-
-			for (int i = 0; i < num; i++) {
-				File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
-				if (!nextToCheck.exists()) {
-					allFound = false;
-					break;
-				}
-			}
-
-			if (allFound) {
-				return true;
-			}
-			else {
-				// not all found, wait for a bit
-				try {
-					Thread.sleep(10);
-				}
-				catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-
-				now = System.currentTimeMillis();
-			}
-		}
-
-		return false;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		public static void main(String[] args) {
-			try {
-				int jobManagerPort = Integer.parseInt(args[0]);
-
-				Configuration cfg = new Configuration();
-				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-	/**
-	 * Utility class to read the output of a process stream and forward it into a StringWriter.
-	 */
-	protected static class PipeForwarder extends Thread {
-
-		private final StringWriter target;
-		private final InputStream source;
-
-		public PipeForwarder(InputStream source, StringWriter target) {
-			super("Pipe Forwarder");
-			setDaemon(true);
-
-			this.source = source;
-			this.target = target;
-
-			start();
-		}
-
-		@Override
-		public void run() {
-			try {
-				int next;
-				while ((next = source.read()) != -1) {
-					target.write(next);
-				}
-			}
-			catch (IOException e) {
-				// terminate
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
new file mode 100644
index 0000000..c02fa6c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Abstract base for tests verifying the behavior of the recovery in the
+ * case when a TaskManager fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends TestLogger {
+
+	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+	protected static final String PROCEED_MARKER_FILE = "proceed";
+	protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+
+	protected static final int PARALLELISM = 4;
+
+	@Test
+	public void testTaskManagerProcessFailure() {
+
+		final StringWriter processOutput1 = new StringWriter();
+		final StringWriter processOutput2 = new StringWriter();
+		final StringWriter processOutput3 = new StringWriter();
+
+		ActorSystem jmActorSystem = null;
+		Process taskManagerProcess1 = null;
+		Process taskManagerProcess2 = null;
+		Process taskManagerProcess3 = null;
+
+		File coordinateTempDir = null;
+
+		try {
+			// check that we run this test only if the java command
+			// is available on this machine
+			String javaCommand = getJavaCommandPath();
+			if (javaCommand == null) {
+				System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
+				return;
+			}
+
+			// create a logging file for the process
+			File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+			tempLogFile.deleteOnExit();
+			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+			// coordination between the processes goes through a directory
+			coordinateTempDir = CommonTestUtils.createTempDirectory();
+
+			// find a free port to start the JobManager
+			final int jobManagerPort = NetUtils.getAvailablePort();
+
+			// start a JobManager
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+			Configuration jmConfig = new Configuration();
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
+			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				jmConfig,
+				jmActorSystem,
+				StreamingMode.STREAMING,
+				JobManager.class,
+				MemoryArchivist.class)._1();
+
+			// the TaskManager java command
+			String[] command = new String[] {
+					javaCommand,
+					"-Dlog.level=DEBUG",
+					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+					"-Xms80m", "-Xmx80m",
+					"-classpath", getCurrentClasspath(),
+					TaskManagerProcessEntryPoint.class.getName(),
+					String.valueOf(jobManagerPort)
+			};
+
+			// start the first two TaskManager processes
+			taskManagerProcess1 = new ProcessBuilder(command).start();
+			new CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+			taskManagerProcess2 = new ProcessBuilder(command).start();
+			new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+			// we wait for the JobManager to have the two TaskManagers available
+			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
+			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
+
+			// the program will set a marker file in each of its parallel tasks once they are ready, so that
+			// this coordinating code is aware of this.
+			// the program will very slowly consume elements until the marker file (later created by the
+			// test driver code) is present
+			final File coordinateDirClosure = coordinateTempDir;
+			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+			// we trigger program execution in a separate thread
+			Thread programTrigger = new Thread("Program Trigger") {
+				@Override
+				public void run() {
+					try {
+						testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+					}
+					catch (Throwable t) {
+						t.printStackTrace();
+						errorRef.set(t);
+					}
+				}
+			};
+
+			//start the test program
+			programTrigger.start();
+
+			// wait until all marker files are in place, indicating that all tasks have started
+			// max 20 seconds
+			if (!waitForMarkerFiles(coordinateTempDir, READY_MARKER_FILE_PREFIX, PARALLELISM, 120000)) {
+				// check if the program failed for some reason
+				if (errorRef.get() != null) {
+					Throwable error = errorRef.get();
+					error.printStackTrace();
+					fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+				}
+				else {
+					// no error occurred, simply a timeout
+					fail("The tasks were not started within time (" + 120000 + "msecs)");
+				}
+			}
+
+			// start the third TaskManager
+			taskManagerProcess3 = new ProcessBuilder(command).start();
+			new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+			// we wait for the third TaskManager to register
+			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
+			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
+
+			// kill one of the previous TaskManagers, triggering a failure and recovery
+			taskManagerProcess1.destroy();
+			taskManagerProcess1 = null;
+
+			// we create the marker file which signals the program functions tasks that they can complete
+			touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+			// wait for at most 5 minutes for the program to complete
+			programTrigger.join(300000);
+
+			// check that the program really finished
+			assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+			// check whether the program encountered an error
+			if (errorRef.get() != null) {
+				Throwable error = errorRef.get();
+				error.printStackTrace();
+				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+			}
+
+			// all seems well :-)
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			fail(e.getMessage());
+		}
+		catch (Error e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			throw e;
+		}
+		finally {
+			if (taskManagerProcess1 != null) {
+				taskManagerProcess1.destroy();
+			}
+			if (taskManagerProcess2 != null) {
+				taskManagerProcess2.destroy();
+			}
+			if (taskManagerProcess3 != null) {
+				taskManagerProcess3.destroy();
+			}
+			if (jmActorSystem != null) {
+				jmActorSystem.shutdown();
+			}
+			if (coordinateTempDir != null) {
+				try {
+					FileUtils.deleteDirectory(coordinateTempDir);
+				}
+				catch (Throwable t) {
+					// we can ignore this
+				}
+			}
+		}
+	}
+
+	/**
+	 * The test program should be implemented here in a form of a separate thread.
+	 * This provides a solution for checking that it has been terminated.
+	 *
+	 * @param jobManagerPort The port for submitting the topology to the local cluster
+	 * @param coordinateDir TaskManager failure will be triggered only after processes
+	 *                             have successfully created file under this directory
+	 */
+	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
+
+
+	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+			throws Exception
+	{
+		final long deadline = System.currentTimeMillis() + maxDelay;
+		while (true) {
+			long remaining = deadline - System.currentTimeMillis();
+			if (remaining <= 0) {
+				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+			}
+
+			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+			try {
+				Future<?> result = Patterns.ask(jobManager,
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+						new Timeout(timeout));
+				Integer numTMs = (Integer) Await.result(result, timeout);
+				if (numTMs == numExpected) {
+					break;
+				}
+			}
+			catch (TimeoutException e) {
+				// ignore and retry
+			}
+			catch (ClassCastException e) {
+				fail("Wrong response: " + e.getMessage());
+			}
+		}
+	}
+
+	protected static void printProcessLog(String processName, String log) {
+		if (log == null || log.length() == 0) {
+			return;
+		}
+
+		System.out.println("-----------------------------------------");
+		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+		System.out.println("-----------------------------------------");
+		System.out.println(log);
+		System.out.println("-----------------------------------------");
+		System.out.println("		END SPAWNED PROCESS LOG");
+		System.out.println("-----------------------------------------");
+	}
+
+	protected static void touchFile(File file) throws IOException {
+		if (!file.exists()) {
+			new FileOutputStream(file).close();
+		}
+		if (!file.setLastModified(System.currentTimeMillis())) {
+			throw new IOException("Could not touch the file.");
+		}
+	}
+
+	protected static boolean waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
+		long now = System.currentTimeMillis();
+		final long deadline = now + timeout;
+
+
+		while (now < deadline) {
+			boolean allFound = true;
+
+			for (int i = 0; i < num; i++) {
+				File nextToCheck = new File(basedir, prefix + i);
+				if (!nextToCheck.exists()) {
+					allFound = false;
+					break;
+				}
+			}
+
+			if (allFound) {
+				return true;
+			}
+			else {
+				// not all found, wait for a bit
+				try {
+					Thread.sleep(10);
+				}
+				catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+
+				now = System.currentTimeMillis();
+			}
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+	 */
+	public static class TaskManagerProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
+
+				// wait forever
+				Object lock = new Object();
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
+}