You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 21:44:41 UTC

[2/2] flink git commit: [FLINK-1669] [streaming] Clean up and fix test for streaming fault tolerance with proess failures

[FLINK-1669] [streaming] Clean up and fix test for streaming fault tolerance with proess failures


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c284745e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c284745e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c284745e

Branch: refs/heads/master
Commit: c284745ee4612054339842789b0d87eb4f9a821d
Parents: 56afefc
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 19 12:07:05 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:40:27 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  17 +-
 .../api/function/sink/SinkFunction.java         |   2 +-
 .../util/ProcessFailureRecoveryTestBase.java    | 456 -------------------
 .../AbstractProcessFailureRecoveryTest.java     | 419 +++++++++++++++++
 .../ProcessFailureBatchRecoveryITCase.java      |  52 +--
 .../ProcessFailureStreamingRecoveryITCase.java  | 228 ++++++----
 6 files changed, 575 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1307b7a..62d9c07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -159,17 +159,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the number of times that failed tasks are re-executed. A value of zero
-	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
-	 * default value (as defined in the configuration) should be used.
-	 *
-	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
-	 */
-	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
-		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
-	}
-
-	/**
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the
 	 * output buffers. By default the output buffers flush frequently to provide
 	 * low latency and to aid smooth developer experience. Setting the parameter
@@ -397,10 +386,10 @@ public abstract class StreamExecutionEnvironment {
 	 *            The interval of file watching in milliseconds.
 	 * @param watchType
 	 *            The watch type of file stream. When watchType is
-	 *            {@link WatchType.ONLY_NEW_FILES}, the system processes only
-	 *            new files. {@link WatchType.REPROCESS_WITH_APPENDED} means
+	 *            {@link WatchType#ONLY_NEW_FILES}, the system processes only
+	 *            new files. {@link WatchType#REPROCESS_WITH_APPENDED} means
 	 *            that the system re-processes all contents of appended file.
-	 *            {@link WatchType.PROCESS_ONLY_APPENDED} means that the system
+	 *            {@link WatchType#PROCESS_ONLY_APPENDED} means that the system
 	 *            processes only appended contents of files.
 	 * 
 	 * @return The DataStream containing the given directory.

http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index d4ce24e..eed4234 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 /**
  * Interface for implementing user defined sink functionality.
  *
- * @param <IN> INput type parameter.
+ * @param <IN> Input type parameter.
  */
 public interface SinkFunction<IN> extends Function, Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
deleted file mode 100644
index 3aef925..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * This is a testbase for tests verifying the behavior of the recovery in the
- * case when a TaskManager fails (process is killed) in the middle of a job execution.
- *
- * The test works with multiple task managers processes by spawning JVMs.
- * Initially, it starts a JobManager in process and two TaskManagers JVMs with
- * 2 task slots each.
- * It submits a program with parallelism 4 and waits until all tasks are brought up.
- * Coordination between the test and the tasks happens via checking for the
- * existence of temporary files. It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the new TaskManager.
- */
-public abstract class ProcessFailureRecoveryTestBase {
-
-	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
-	protected static final String PROCEED_MARKER_FILE = "proceed";
-
-	protected static final int PARALLELISM = 4;
-
-	@Test
-	public void testTaskManagerProcessFailure() {
-
-		final StringWriter processOutput1 = new StringWriter();
-		final StringWriter processOutput2 = new StringWriter();
-		final StringWriter processOutput3 = new StringWriter();
-
-		ActorSystem jmActorSystem = null;
-		Process taskManagerProcess1 = null;
-		Process taskManagerProcess2 = null;
-		Process taskManagerProcess3 = null;
-
-		File coordinateTempDir = null;
-
-		try {
-			// check that we run this test only if the java command
-			// is available on this machine
-			String javaCommand = getJavaCommandPath();
-			if (javaCommand == null) {
-				System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable");
-				return;
-			}
-
-			// create a logging file for the process
-			File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
-			tempLogFile.deleteOnExit();
-			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-			// coordination between the processes goes through a directory
-			coordinateTempDir = createTempDirectory();
-
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
-			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
-
-			// the TaskManager java command
-			String[] command = new String[]{
-					javaCommand,
-					"-Dlog.level=DEBUG",
-					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
-					"-Xms80m", "-Xmx80m",
-					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
-					String.valueOf(jobManagerPort)
-			};
-
-			// start the first two TaskManager processes
-			taskManagerProcess1 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
-			taskManagerProcess2 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
-
-			// we wait for the JobManager to have the two TaskManagers available
-			// wait for at most 20 seconds
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
-
-			// the program will set a marker file in each of its parallel tasks once they are ready, so that
-			// this coordinating code is aware of this.
-			// the program will very slowly consume elements until the marker file (later created by the
-			// test driver code) is present
-			final File coordinateDirClosure = coordinateTempDir;
-			final Throwable[] errorRef = new Throwable[1];
-
-			// get a trigger for the test program implemented by a subclass
-			Thread programTrigger = testProgram(jobManagerPort, coordinateDirClosure, errorRef);
-
-			//start the test program
-			programTrigger.start();
-
-			// wait until all marker files are in place, indicating that all tasks have started
-			// max 20 seconds
-			waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
-
-			// start the third TaskManager
-			taskManagerProcess3 = new ProcessBuilder(command).start();
-			new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
-
-			// we wait for the third TaskManager to register (20 seconds max)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
-
-			// kill one of the previous TaskManagers, triggering a failure and recovery
-			taskManagerProcess1.destroy();
-			taskManagerProcess1 = null;
-
-			// we create the marker file which signals the program functions tasks that they can complete
-			touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
-
-			// wait for at most 2 minutes for the program to complete
-			programTrigger.join(120000);
-
-			// check that the program really finished
-			assertFalse("The program did not finish in time", programTrigger.isAlive());
-
-			// apply post submission checks specified by the subclass
-			postSubmit();
-
-			// check whether the program encountered an error
-			if (errorRef[0] != null) {
-				Throwable error = errorRef[0];
-				error.printStackTrace();
-				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
-			}
-
-			// all seems well :-)
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			printProcessLog("TaskManager 1", processOutput1.toString());
-			printProcessLog("TaskManager 2", processOutput2.toString());
-			printProcessLog("TaskManager 3", processOutput3.toString());
-			fail(e.getMessage());
-		} catch (Error e) {
-			e.printStackTrace();
-			printProcessLog("TaskManager 1", processOutput1.toString());
-			printProcessLog("TaskManager 2", processOutput2.toString());
-			printProcessLog("TaskManager 3", processOutput3.toString());
-			throw e;
-		} finally {
-			if (taskManagerProcess1 != null) {
-				taskManagerProcess1.destroy();
-			}
-			if (taskManagerProcess2 != null) {
-				taskManagerProcess2.destroy();
-			}
-			if (taskManagerProcess3 != null) {
-				taskManagerProcess3.destroy();
-			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-			if (coordinateTempDir != null) {
-				try {
-					FileUtils.deleteDirectory(coordinateTempDir);
-				}
-				catch (Throwable t) {
-					// we can ignore this
-				}
-			}
-		}
-	}
-
-	/**
-	 * The test program should be implemented here in a form of a separate thread.
-	 * This provides a solution for checking that it has been terminated.
-	 *
-	 * @param jobManagerPort The port for submitting the topology to the local cluster
-	 * @param coordinateDirClosure taskmanager failure will be triggered only after proccesses
-	 *                             have successfully created file under this directory
-	 * @param errorRef Errors passed back to the superclass
-	 * @return thread containing the test program
-	 */
-	abstract public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef);
-
-	/**
-	 * Check to be carried out after the completion of the test program thread.
-	 * In case of failed checks {@link java.lang.AssertionError} should be thrown.
-	 *
-	 * @throws Error
-	 * @throws Exception
-	 */
-	abstract public void postSubmit() throws Error, Exception;
-
-
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception
-	{
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
-	}
-
-	public static void fileBatchHasEveryNumberLower(int n, String path) throws IOException, AssertionError {
-
-		HashSet<Integer> set = new HashSet<Integer>(n);
-
-		int counter = 0;
-		File file = new File(path + "-" + counter);
-
-		while (file.exists()) {
-
-			BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
-
-			String line = bufferedReader.readLine();
-
-			while (line != null) {
-				int num = Integer.parseInt(line);
-
-				set.add(num);
-
-				line = bufferedReader.readLine();
-			}
-
-			bufferedReader.close();
-			file.delete();
-			counter++;
-			file = new File(path + "-" + counter);
-		}
-
-		for (int i = 0; i < n; i++) {
-			if (!set.contains(i)) {
-				throw new AssertionError("Missing number: " + i);
-			}
-		}
-	}
-
-	protected static void printProcessLog(String processName, String log) {
-		if (log == null || log.length() == 0) {
-			return;
-		}
-
-		System.out.println("-----------------------------------------");
-		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
-		System.out.println("-----------------------------------------");
-		System.out.println(log);
-		System.out.println("-----------------------------------------");
-		System.out.println("		END SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-	}
-
-	protected static File createTempDirectory() throws IOException {
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-		for (int i = 0; i < 10; i++) {
-			File dir = new File(tempDir, UUID.randomUUID().toString());
-			if (!dir.exists() && dir.mkdirs()) {
-				return dir;
-			}
-			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
-		}
-
-		throw new IOException("Could not create temporary file directory");
-	}
-
-	protected static void touchFile(File file) throws IOException {
-		if (!file.exists()) {
-			new FileOutputStream(file).close();
-		}
-		if (!file.setLastModified(System.currentTimeMillis())) {
-			throw new IOException("Could not touch the file.");
-		}
-	}
-
-	protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
-		long now = System.currentTimeMillis();
-		final long deadline = now + timeout;
-
-
-		while (now < deadline) {
-			boolean allFound = true;
-
-			for (int i = 0; i < num; i++) {
-				File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
-				if (!nextToCheck.exists()) {
-					allFound = false;
-					break;
-				}
-			}
-
-			if (allFound) {
-				return;
-			}
-			else {
-				// not all found, wait for a bit
-				try {
-					Thread.sleep(10);
-				}
-				catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-
-				now = System.currentTimeMillis();
-			}
-		}
-
-		fail("The tasks were not started within time (" + timeout + "msecs)");
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		public static void main(String[] args) {
-			try {
-				int jobManagerPort = Integer.parseInt(args[0]);
-
-				Configuration cfg = new Configuration();
-				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-
-				TaskManager.runTaskManager(cfg, TaskManager.class);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-	/**
-	 * Utility class to read the output of a process stream and forward it into a StringWriter.
-	 */
-	protected static class PipeForwarder extends Thread {
-
-		private final StringWriter target;
-		private final InputStream source;
-
-		public PipeForwarder(InputStream source, StringWriter target) {
-			super("Pipe Forwarder");
-			setDaemon(true);
-
-			this.source = source;
-			this.target = target;
-
-			start();
-		}
-
-		@Override
-		public void run() {
-			try {
-				int next;
-				while ((next = source.read()) != -1) {
-					target.write(next);
-				}
-			}
-			catch (IOException e) {
-				// terminate
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
new file mode 100644
index 0000000..4311e9c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Abstract base for tests verifying the behavior of the recovery in the
+ * case when a TaskManager fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+public abstract class AbstractProcessFailureRecoveryTest {
+
+	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+	protected static final String PROCEED_MARKER_FILE = "proceed";
+
+	protected static final int PARALLELISM = 4;
+
+	@Test
+	public void testTaskManagerProcessFailure() {
+
+		final StringWriter processOutput1 = new StringWriter();
+		final StringWriter processOutput2 = new StringWriter();
+		final StringWriter processOutput3 = new StringWriter();
+
+		ActorSystem jmActorSystem = null;
+		Process taskManagerProcess1 = null;
+		Process taskManagerProcess2 = null;
+		Process taskManagerProcess3 = null;
+
+		File coordinateTempDir = null;
+
+		try {
+			// check that we run this test only if the java command
+			// is available on this machine
+			String javaCommand = getJavaCommandPath();
+			if (javaCommand == null) {
+				System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
+				return;
+			}
+
+			// create a logging file for the process
+			File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+			tempLogFile.deleteOnExit();
+			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+			// coordination between the processes goes through a directory
+			coordinateTempDir = createTempDirectory();
+
+			// find a free port to start the JobManager
+			final int jobManagerPort = NetUtils.getAvailablePort();
+
+			// start a JobManager
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+			Configuration jmConfig = new Configuration();
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "2 s");
+
+			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+			// the TaskManager java command
+			String[] command = new String[] {
+					javaCommand,
+					"-Dlog.level=DEBUG",
+					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+					"-Xms80m", "-Xmx80m",
+					"-classpath", getCurrentClasspath(),
+					TaskManagerProcessEntryPoint.class.getName(),
+					String.valueOf(jobManagerPort)
+			};
+
+			// start the first two TaskManager processes
+			taskManagerProcess1 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+			taskManagerProcess2 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+			// we wait for the JobManager to have the two TaskManagers available
+			// wait for at most 20 seconds
+			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+
+			// the program will set a marker file in each of its parallel tasks once they are ready, so that
+			// this coordinating code is aware of this.
+			// the program will very slowly consume elements until the marker file (later created by the
+			// test driver code) is present
+			final File coordinateDirClosure = coordinateTempDir;
+			final Throwable[] errorRef = new Throwable[1];
+
+			// we trigger program execution in a separate thread
+			Thread programTrigger = new Thread("Program Trigger") {
+				@Override
+				public void run() {
+					try {
+						testProgram(jobManagerPort, coordinateDirClosure);
+					}
+					catch (Throwable t) {
+						t.printStackTrace();
+						errorRef[0] = t;
+					}
+				}
+			};
+
+			//start the test program
+			programTrigger.start();
+
+			// wait until all marker files are in place, indicating that all tasks have started
+			// max 20 seconds
+			waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
+
+			// start the third TaskManager
+			taskManagerProcess3 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+			// we wait for the third TaskManager to register (20 seconds max)
+			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+
+			// kill one of the previous TaskManagers, triggering a failure and recovery
+			taskManagerProcess1.destroy();
+			taskManagerProcess1 = null;
+
+			// we create the marker file which signals the program functions tasks that they can complete
+			touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+			// wait for at most 2 minutes for the program to complete
+			programTrigger.join(120000);
+
+			// check that the program really finished
+			assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+			// check whether the program encountered an error
+			if (errorRef[0] != null) {
+				Throwable error = errorRef[0];
+				error.printStackTrace();
+				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+			}
+
+			// all seems well :-)
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			fail(e.getMessage());
+		}
+		catch (Error e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			throw e;
+		}
+		finally {
+			if (taskManagerProcess1 != null) {
+				taskManagerProcess1.destroy();
+			}
+			if (taskManagerProcess2 != null) {
+				taskManagerProcess2.destroy();
+			}
+			if (taskManagerProcess3 != null) {
+				taskManagerProcess3.destroy();
+			}
+			if (jmActorSystem != null) {
+				jmActorSystem.shutdown();
+			}
+			if (coordinateTempDir != null) {
+				try {
+					FileUtils.deleteDirectory(coordinateTempDir);
+				}
+				catch (Throwable t) {
+					// we can ignore this
+				}
+			}
+		}
+	}
+
+	/**
+	 * The test program should be implemented here in a form of a separate thread.
+	 * This provides a solution for checking that it has been terminated.
+	 *
+	 * @param jobManagerPort The port for submitting the topology to the local cluster
+	 * @param coordinateDir TaskManager failure will be triggered only after processes
+	 *                             have successfully created file under this directory
+	 */
+	public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception;
+
+
+	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+			throws Exception
+	{
+		final long deadline = System.currentTimeMillis() + maxDelay;
+		while (true) {
+			long remaining = deadline - System.currentTimeMillis();
+			if (remaining <= 0) {
+				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+			}
+
+			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+			try {
+				Future<?> result = Patterns.ask(jobManager,
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+						new Timeout(timeout));
+				Integer numTMs = (Integer) Await.result(result, timeout);
+				if (numTMs == numExpected) {
+					break;
+				}
+			}
+			catch (TimeoutException e) {
+				// ignore and retry
+			}
+			catch (ClassCastException e) {
+				fail("Wrong response: " + e.getMessage());
+			}
+		}
+	}
+
+	protected static void printProcessLog(String processName, String log) {
+		if (log == null || log.length() == 0) {
+			return;
+		}
+
+		System.out.println("-----------------------------------------");
+		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+		System.out.println("-----------------------------------------");
+		System.out.println(log);
+		System.out.println("-----------------------------------------");
+		System.out.println("		END SPAWNED PROCESS LOG");
+		System.out.println("-----------------------------------------");
+	}
+
+	protected static File createTempDirectory() throws IOException {
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		for (int i = 0; i < 10; i++) {
+			File dir = new File(tempDir, UUID.randomUUID().toString());
+			if (!dir.exists() && dir.mkdirs()) {
+				return dir;
+			}
+			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+		}
+
+		throw new IOException("Could not create temporary file directory");
+	}
+
+	protected static void touchFile(File file) throws IOException {
+		if (!file.exists()) {
+			new FileOutputStream(file).close();
+		}
+		if (!file.setLastModified(System.currentTimeMillis())) {
+			throw new IOException("Could not touch the file.");
+		}
+	}
+
+	protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
+		long now = System.currentTimeMillis();
+		final long deadline = now + timeout;
+
+
+		while (now < deadline) {
+			boolean allFound = true;
+
+			for (int i = 0; i < num; i++) {
+				File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
+				if (!nextToCheck.exists()) {
+					allFound = false;
+					break;
+				}
+			}
+
+			if (allFound) {
+				return;
+			}
+			else {
+				// not all found, wait for a bit
+				try {
+					Thread.sleep(10);
+				}
+				catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+
+				now = System.currentTimeMillis();
+			}
+		}
+
+		fail("The tasks were not started within time (" + timeout + "msecs)");
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+	 */
+	public static class TaskManagerProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+				TaskManager.runTaskManager(cfg, TaskManager.class);
+
+				// wait forever
+				Object lock = new Object();
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
+	/**
+	 * Utility class to read the output of a process stream and forward it into a StringWriter.
+	 */
+	protected static class PipeForwarder extends Thread {
+
+		private final StringWriter target;
+		private final InputStream source;
+
+		public PipeForwarder(InputStream source, StringWriter target) {
+			super("Pipe Forwarder");
+			setDaemon(true);
+
+			this.source = source;
+			this.target = target;
+
+			start();
+		}
+
+		@Override
+		public void run() {
+			try {
+				int next;
+				while ((next = source.read()) != -1) {
+					target.write(next);
+				}
+			}
+			catch (IOException e) {
+				// terminate
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index c013fa8..cdee8ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -18,24 +18,32 @@
 
 package org.apache.flink.test.recovery;
 
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.ProcessFailureRecoveryTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test for streaming program behaviour in case of taskmanager failure
- * based on {@link ProcessFailureRecoveryTestBase}.
+ * Test the recovery of a simple batch program in the case of TaskManager process failure.
  */
 @SuppressWarnings("serial")
-public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRecoveryITCase {
+@RunWith(Parameterized.class)
+public class ProcessFailureBatchRecoveryITCase extends AbstractProcessFailureRecoveryTest {
 
-	private ExecutionMode executionMode;
+	// --------------------------------------------------------------------------------------------
+	//  Parametrization (run pipelined and batch)
+	// --------------------------------------------------------------------------------------------
+
+	private final ExecutionMode executionMode;
 
 	public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
 		this.executionMode = executionMode;
@@ -48,12 +56,17 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
 				{ExecutionMode.BATCH}});
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Test the program
+	// --------------------------------------------------------------------------------------------
+
 	@Override
-	public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) {
+	public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 		env.setNumberOfExecutionRetries(1);
+		env.getConfig().setExecutionMode(executionMode);
 
 		final long NUM_ELEMENTS = 100000L;
 		final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
@@ -63,7 +76,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
 						// the majority of the behavior is in the MapFunction
 				.map(new RichMapFunction<Long, Long>() {
 
-					private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE);
+					private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
 
 					private boolean markerCreated = false;
 					private boolean checkForProceedFile = true;
@@ -72,7 +85,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
 					public Long map(Long value) throws Exception {
 						if (!markerCreated) {
 							int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
-							touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
+							touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
 							markerCreated = true;
 						}
 
@@ -95,24 +108,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
 					}
 				});
 
-		// we trigger program execution in a separate thread
-		return new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
-			@Override
-			public void run() {
-				try {
-					long sum = result.collect().get(0);
-					assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
-				} catch (Throwable t) {
-					t.printStackTrace();
-					errorRef[0] = t;
-				}
-			}
-		};
-	}
-
-	@Override
-	public void postSubmit() throws Exception, Error {
-		// unnecessary
+		long sum = result.collect().get(0);
+		assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 1933766..82b9d6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.test.recovery;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -26,58 +28,70 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.test.util.ProcessFailureRecoveryTestBase;
 import org.apache.flink.util.Collector;
 
-import java.io.BufferedWriter;
+import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileWriter;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
 
 /**
- * Test for streaming program behaviour in case of taskmanager failure
- * based on {@link ProcessFailureRecoveryTestBase}.
+ * Test for streaming program behaviour in case of TaskManager failure
+ * based on {@link AbstractProcessFailureRecoveryTest}.
+ *
+ * The logic in this test is as follows:
+ *  - The source slowly emits records (every 10 msecs) until the test driver
+ *    gives the "go" for regular execution
+ *  - The "go" is given after the first taskmanager has been killed, so it can only
+ *    happen in the recovery run
+ *  - The mapper must not be slow, because otherwise the checkpoint barrier cannot pass
+ *    the mapper and no checkpoint will be completed before the killing of the first
+ *    TaskManager.
  */
 @SuppressWarnings("serial")
-public class ProcessFailureStreamingRecoveryITCase extends ProcessFailureRecoveryTestBase {
+public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailureRecoveryTest {
 
-	private static final String RESULT_PATH = "tempTestOutput";
-	private static final int DATA_COUNT = 252;
+	private static final int DATA_COUNT = 10000;
 
 	@Override
-	public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) {
+	public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
+
+		final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+												UUID.randomUUID().toString());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
-			env.setDegreeOfParallelism(PARALLELISM);
-			env.setNumberOfExecutionRetries(1);
-			env.enableMonitoring(100);
+		assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs());
 
-		final DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(DATA_COUNT))
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+									.createRemoteEnvironment("localhost", jobManagerPort);
+		env.setParallelism(PARALLELISM);
+		env.setNumberOfExecutionRetries(1);
+		env.enableCheckpointing(200);
+
+		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
 
 				// make sure every mapper is involved
-				.shuffle()
+//				.shuffle()
 
-				// populate the coordinate directory so we can proceed to taskmanager failure
+				// populate the coordinate directory so we can proceed to TaskManager failure
 				.map(new RichMapFunction<Long, Long>() {
 
 					private boolean markerCreated = false;
 
 					@Override
-					public void open(Configuration parameters) throws IOException {
-
+					public Long map(Long value) throws Exception {
 						if (!markerCreated) {
 							int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
-							try {
-								touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
-							} catch (IOException e) {
-								e.printStackTrace();
-							}
+							touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
 							markerCreated = true;
 						}
-					}
-
-					@Override
-					public Long map(Long value) throws Exception {
 						return value;
 					}
 				});
@@ -85,124 +99,138 @@ public class ProcessFailureStreamingRecoveryITCase extends ProcessFailureRecover
 		//write result to temporary file
 		result.addSink(new RichSinkFunction<Long>() {
 
-			private transient File output;
-			private transient int outputIndex;
-			private transient BufferedWriter writer;
+			// the sink needs to do its write operations synchronized with
+			// the disk FS, otherwise the process kill will discard data
+			// in buffers in the process
+			private transient FileChannel writer;
 
 			@Override
 			public void open(Configuration parameters) throws IOException {
-				outputIndex = 0;
-				do {
-					output = new File(RESULT_PATH + "-" + outputIndex);
-					outputIndex++;
-				} while (output.exists());
 
-				writer = new BufferedWriter(new FileWriter(output));
-			}
+				int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+				File output = new File(tempTestOutput, "task-" + taskIndex + "-" + UUID.randomUUID().toString());
 
-			@Override
-			public void invoke(Long value) throws Exception {
-				writer.write(value.toString());
-				writer.newLine();
+				// "rws" causes writes to go synchronously to the filesystem, nothing is cached
+				RandomAccessFile outputFile = new RandomAccessFile(output, "rws");
+				this.writer = outputFile.getChannel();
 			}
 
 			@Override
-			public void close(){
-				try {
-					writer.close();
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
+			public void invoke(Long value) throws Exception {
+				String text = value + "\n";
+				byte[] bytes = text.getBytes(Charset.defaultCharset());
+				ByteBuffer buffer = ByteBuffer.wrap(bytes);
+				writer.write(buffer);
 			}
 
 			@Override
-			public void cancel() {
-				close();
+			public void close() throws Exception {
+				writer.close();
 			}
-
 		});
 
-		// we trigger program execution in a separate thread
-		return new ProgramTrigger(env, errorRef);
-	}
+		try {
+			// blocking call until execution is done
+			env.execute();
 
-	@Override
-	public void postSubmit() throws Exception, Error {
-
-		// checks at least once processing guarantee of the output stream
-		fileBatchHasEveryNumberLower(DATA_COUNT, RESULT_PATH);
+			// validate
+			fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput);
+		}
+		finally {
+			// clean up
+			if (tempTestOutput.exists()) {
+				FileUtils.deleteDirectory(tempTestOutput);
+			}
+		}
 	}
 
 	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
 
-		private static final long SLEEP_TIME = 10;
+		private static final long SLEEP_TIME = 50;
 
-		private long end;
-		private long collected = 0L;
-		private long toCollect;
-		private long coungrence;
-		private long stepSize;
-		private transient OperatorState<Long> collectedState;
+		private final File coordinateDir;
+		private final long end;
 
-		public SleepyDurableGenerateSequence(long end){
+		public SleepyDurableGenerateSequence(File coordinateDir, long end) {
+			this.coordinateDir = coordinateDir;
 			this.end = end;
 		}
 
-		public void open(Configuration parameters) throws Exception {
+		@Override
+		@SuppressWarnings("unchecked")
+		public void run(Collector<Long> collector) throws Exception {
 
 			StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-
+			OperatorState<Long> collectedState;
 			if (context.containsState("collected")) {
 				collectedState = (OperatorState<Long>) context.getState("collected");
-				collected = collectedState.getState();
-			} else {
-				collectedState = new OperatorState<Long>(collected);
+
+//				if (collected == 0) {
+//					throw new RuntimeException("The state did not capture a completed checkpoint");
+//				}
+			}
+			else {
+				collectedState = new OperatorState<Long>(0L);
 				context.registerState("collected", collectedState);
 			}
-			super.open(parameters);
 
-			stepSize = context.getNumberOfParallelSubtasks();
-			coungrence = context.getIndexOfThisSubtask();
-			toCollect = (end % stepSize > coungrence) ? (end / stepSize + 1) : (end / stepSize);
-		}
+			final long stepSize = context.getNumberOfParallelSubtasks();
+			final long congruence = context.getIndexOfThisSubtask();
+			final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+			long collected = collectedState.getState();
+
+			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
+			boolean checkForProceedFile = true;
+
+			while (collected < toCollect) {
+				// check if the proceed file exists (then we go full speed)
+				// if not, we always recheck and sleep
+				if (checkForProceedFile) {
+					if (proceedFile.exists()) {
+						checkForProceedFile = false;
+					} else {
+						// otherwise wait so that we make slow progress
+						Thread.sleep(SLEEP_TIME);
+					}
+				}
 
-		@Override
-		public void run(Collector<Long> collector) throws Exception {
-			while (collected < toCollect){
-				collector.collect(collected * stepSize + coungrence);
+				collector.collect(collected * stepSize + congruence);
 				collectedState.update(collected);
 				collected++;
-				Thread.sleep(SLEEP_TIME);
 			}
-
 		}
 
 		@Override
-		public void cancel() {
-		}
+		public void cancel() {}
 	}
 
-	public class ProgramTrigger extends Thread {
 
-		StreamExecutionEnvironment env;
-		Throwable[] errorRef;
+	private static void fileBatchHasEveryNumberLower(int numFiles, int numbers, File path) throws IOException {
 
-		ProgramTrigger(StreamExecutionEnvironment env, Throwable[] errorRef){
-			super("ProcessFailureStreamingRecoveryITCase Program Trigger");
-			this.env = env;
-			this.errorRef = errorRef;
-		}
+		HashSet<Integer> set = new HashSet<Integer>(numbers);
 
-		@Override
-		public void run() {
-			try {
-				env.execute();
-			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				errorRef[0] = t;
+		File[] files = path.listFiles();
+		assertNotNull(files);
+		assertTrue("Not enough output files", files.length >= numFiles);
+
+		for (File file : files) {
+			assertTrue("Output file does not exist", file.exists());
+
+			BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
+
+			String line;
+			while ((line = bufferedReader.readLine()) != null) {
+				int num = Integer.parseInt(line);
+				set.add(num);
 			}
+
+			bufferedReader.close();
 		}
 
+		for (int i = 0; i < numbers; i++) {
+			if (!set.contains(i)) {
+				fail("Missing number: " + i);
+			}
+		}
 	}
 }