You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:54 UTC

[03/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 837b643..cd40c82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -48,7 +49,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
-import static org.junit.Assert.fail;
 
 public class TaskCancelTest {
 
@@ -109,30 +109,21 @@ public class TaskCancelTest {
 
 			// Wait for the job to make some progress and then cancel
 			awaitRunning(
-				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-				jobGraph.getJobID(),
-				TestingUtils.TESTING_DURATION());
+					flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+					jobGraph.getJobID(),
+					TestingUtils.TESTING_DURATION());
 
 			Thread.sleep(5000);
 
 			cancelJob(
-				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-				jobGraph.getJobID(),
-				TestingUtils.TESTING_DURATION());
+					flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+					jobGraph.getJobID(),
+					TestingUtils.TESTING_DURATION());
 
 			// Wait for the job to be cancelled
-			JobStatus status = awaitTermination(
-				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-				jobGraph.getJobID(),
-				TestingUtils.TESTING_DURATION());
-
-			if (status == JobStatus.CANCELED) {
-				// Expected :-) All is swell.
-			}
-			else {
-				fail("The job finished with unexpected terminal state " + status + ". "
-						+ "This indicates that there is a bug in the task cancellation.");
-			}
+			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
+					flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+					TestingUtils.TESTING_DURATION());
 		}
 		finally {
 			if (flink != null) {
@@ -224,42 +215,6 @@ public class TaskCancelTest {
 
 	}
 
-	private JobStatus awaitTermination(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
-			throws Exception {
-
-		checkNotNull(jobManager);
-		checkNotNull(jobId);
-		checkNotNull(timeout);
-
-		while (true) {
-			Future<Object> ask = jobManager.ask(
-					new RequestJobStatus(jobId),
-					timeout);
-
-			Object result = Await.result(ask, timeout);
-
-			if (result instanceof CurrentJobStatus) {
-				// Success
-				CurrentJobStatus status = (CurrentJobStatus) result;
-
-				if (!status.jobID().equals(jobId)) {
-					throw new Exception("JobManager responded for wrong job ID. Request: "
-							+ jobId + ", response: " + status.jobID() + ".");
-				}
-
-				if (status.status().isTerminalState()) {
-					return status.status();
-				}
-			}
-			else if (result instanceof JobNotFound) {
-				throw new Exception("Cannot find job with ID " + jobId + ".");
-			}
-			else {
-				throw new Exception("Unexpected response to cancel request: " + result);
-			}
-		}
-	}
-
 	// ---------------------------------------------------------------------------------------------
 
 	public static class InfiniteSource extends AbstractInvokable {

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 61b1f7a..069b6af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,26 +18,17 @@
 
 package org.apache.flink.runtime.testutils;
 
-import static org.junit.Assert.fail;
+import org.apache.flink.runtime.util.FileUtils;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.InputStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import java.util.UUID;
 
 /**
  * This class contains auxiliary methods for unit tests.
@@ -77,6 +68,18 @@ public class CommonTestUtils {
 	}
 
 	/**
+	 * Create a temporary log4j configuration for the test.
+	 */
+	public static File createTemporaryLog4JProperties() throws IOException {
+		File log4jProps = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
+				".properties");
+		log4jProps.deleteOnExit();
+		CommonTestUtils.printLog4jDebugConfig(log4jProps);
+
+		return log4jProps;
+	}
+
+	/**
 	 * Tries to get the java executable command with which the current JVM was started.
 	 * Returns null, if the command could not be found.
 	 *
@@ -152,4 +155,50 @@ public class CommonTestUtils {
 			fw.close();
 		}
 	}
+
+	public static File createTempDirectory() throws IOException {
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		for (int i = 0; i < 10; i++) {
+			File dir = new File(tempDir, UUID.randomUUID().toString());
+			if (!dir.exists() && dir.mkdirs()) {
+				return dir;
+			}
+			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+		}
+
+		throw new IOException("Could not create temporary file directory");
+	}
+
+	/**
+	 * Utility class to read the output of a process stream and forward it into a StringWriter.
+	 */
+	public static class PipeForwarder extends Thread {
+
+		private final StringWriter target;
+		private final InputStream source;
+
+		public PipeForwarder(InputStream source, StringWriter target) {
+			super("Pipe Forwarder");
+			setDaemon(true);
+
+			this.source = source;
+			this.target = target;
+
+			start();
+		}
+
+		@Override
+		public void run() {
+			try {
+				int next;
+				while ((next = source.read()) != -1) {
+					target.write(next);
+				}
+			}
+			catch (IOException e) {
+				// terminate
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
new file mode 100644
index 0000000..66e1d9b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
+import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
+import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestJobStatus;
+import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestNumberRegisteredTaskManager;
+
+/**
+ * JobManager actor test utilities.
+ *
+ * <p>If you are using a {@link TestingJobManager} most of these are not needed.
+ */
+public class JobManagerActorTestUtils {
+
+	/**
+	 * Waits for the expected {@link JobStatus}.
+	 *
+	 * <p>Repeatedly queries the JobManager via {@link RequestJobStatus} messages.
+	 *
+	 * @param jobId             Job ID of the job to wait for
+	 * @param expectedJobStatus Expected job status
+	 * @param jobManager        Job manager actor to ask
+	 * @param timeout           Timeout after which the operation fails
+	 * @throws Exception If the job is not found within the timeout or the job is in another state.
+	 */
+	public static void waitForJobStatus(
+			JobID jobId,
+			JobStatus expectedJobStatus,
+			ActorGateway jobManager,
+			FiniteDuration timeout) throws Exception {
+
+		checkNotNull(jobId, "Job ID");
+		checkNotNull(expectedJobStatus, "Expected job status");
+		checkNotNull(jobManager, "Job manager");
+		checkNotNull(timeout, "Timeout");
+
+		final Deadline deadline = timeout.fromNow();
+
+		while (deadline.hasTimeLeft()) {
+			// Request the job status
+			JobStatusResponse response = requestJobStatus(jobId, jobManager, deadline.timeLeft());
+
+			// Found the job
+			if (response instanceof CurrentJobStatus) {
+				JobStatus jobStatus = ((CurrentJobStatus) response).status();
+
+				// OK, that's what we were waiting for
+				if (jobStatus == expectedJobStatus) {
+					return;
+				}
+				else if (jobStatus.isTerminalState()) {
+					throw new IllegalStateException("Job is in terminal state " + jobStatus + ", "
+							+ "but was waiting for " + expectedJobStatus + ".");
+				}
+			}
+			// Did not find the job... retry
+			else if (response instanceof JobNotFound) {
+				Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+			}
+			else {
+				throw new IllegalStateException("Unexpected response.");
+			}
+		}
+
+		throw new IllegalStateException("Job not found within deadline.");
+	}
+
+	/**
+	 * Request a {@link JobStatusResponse}.
+	 *
+	 * @param jobId      Job ID of the job to request the status of
+	 * @param jobManager Job manager actor to ask
+	 * @param timeout    Timeout after which the operation fails
+	 * @return The {@link JobStatusResponse} from the job manager
+	 * @throws Exception If there is no answer within the timeout.
+	 */
+	public static JobStatusResponse requestJobStatus(
+			JobID jobId,
+			ActorGateway jobManager,
+			FiniteDuration timeout) throws Exception {
+
+		checkNotNull(jobId, "Job ID");
+		checkNotNull(jobManager, "Job manager");
+		checkNotNull(timeout, "Timeout");
+
+		// Ask the JobManager
+		RequestJobStatus request = (RequestJobStatus) getRequestJobStatus(jobId);
+		Future<Object> ask = jobManager.ask(request, timeout);
+		Object response = Await.result(ask, timeout);
+
+		if (response instanceof JobStatusResponse) {
+			return (JobStatusResponse) response;
+		}
+
+		throw new IllegalStateException("Unexpected response.");
+	}
+
+	/**
+	 * Waits for a minimum number of task managers to connect to the job manager.
+	 *
+	 * @param minimumNumberOfTaskManagers Minimum number of task managers to wait for
+	 * @param jobManager                  Job manager actor to ask
+	 * @param timeout                     Timeout after which the operation fails
+	 * @throws Exception If the task managers don't connection with the timeout.
+	 */
+	public static void waitForTaskManagers(
+			int minimumNumberOfTaskManagers,
+			ActorGateway jobManager,
+			FiniteDuration timeout) throws Exception {
+
+		checkArgument(minimumNumberOfTaskManagers >= 1);
+		checkNotNull(jobManager, "Job manager");
+		checkNotNull(timeout, "Timeout");
+
+		final Deadline deadline = timeout.fromNow();
+
+		while (deadline.hasTimeLeft()) {
+			Future<Object> ask = jobManager.ask(getRequestNumberRegisteredTaskManager(),
+					deadline.timeLeft());
+
+			Integer response = (Integer) Await.result(ask, deadline.timeLeft());
+
+			// All are connected. We are done.
+			if (response >= minimumNumberOfTaskManagers) {
+				return;
+			}
+			// Waiting for more... retry
+			else {
+				Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+			}
+		}
+
+		throw new IllegalStateException("Task managers not connected within deadline.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
new file mode 100644
index 0000000..85b768d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.JobManagerMode;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link JobManager} instance running in a separate JVM.
+ */
+public class JobManagerProcess extends TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+	/** ID for this JobManager */
+	private final int id;
+
+	/** The port the JobManager listens on */
+	private final int jobManagerPort;
+
+	/** The configuration for the JobManager */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint} */
+	private final String[] jvmArgs;
+
+	private ActorRef jobManagerRef;
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * <p>See {@link #JobManagerProcess(int, Configuration, int)} for a more
+	 * detailed
+	 * description.
+	 *
+	 * @param config Configuration for the job manager process
+	 * @throws Exception
+	 */
+	public JobManagerProcess(int id, Configuration config) throws Exception {
+		this(id, config, 0);
+	}
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * @param id             ID for the JobManager
+	 * @param config         Configuration for the job manager process
+	 * @param jobManagerPort Job manager port (if <code>0</code>, pick any available port)
+	 * @throws Exception
+	 */
+	public JobManagerProcess(int id, Configuration config, int jobManagerPort) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+		this.jobManagerPort = jobManagerPort <= 0 ? NetUtils.getAvailablePort() : jobManagerPort;
+
+		ArrayList<String> args = new ArrayList<>();
+		args.add("--port");
+		args.add(String.valueOf(this.jobManagerPort));
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "JobManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return JobManagerProcessEntryPoint.class.getName();
+	}
+
+	public int getJobManagerPort() {
+		return jobManagerPort;
+	}
+
+	public Configuration getConfig() {
+		return config;
+	}
+
+	/**
+	 * Returns the Akka URL of this JobManager.
+	 */
+	public String getJobManagerAkkaURL() {
+		return JobManager.getRemoteJobManagerAkkaURL(
+				new InetSocketAddress("localhost", jobManagerPort),
+				Option.<String>empty());
+	}
+
+	@Override
+	public String toString() {
+		return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+	}
+
+	/**
+	 * Waits for the job manager to be reachable.
+	 *
+	 * <p><strong>Important:</strong> Make sure to set the timeout larger than Akka's gating
+	 * time. Otherwise, this will effectively not wait for the JobManager to startup, because the
+	 * time out will fire immediately.
+	 *
+	 * @param actorSystem Actor system to be used to resolve JobManager address.
+	 * @param timeout     Timeout (make sure to set larger than Akka's gating time).
+	 */
+	public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration timeout)
+			throws Exception {
+
+		if (jobManagerRef != null) {
+			return jobManagerRef;
+		}
+
+		checkNotNull(actorSystem, "Actor system");
+
+		// Deadline passes timeout ms
+		Deadline deadline = timeout.fromNow();
+
+		while (deadline.hasTimeLeft()) {
+			try {
+				// If the Actor is not reachable yet, this throws an Exception. Retry until the
+				// deadline passes.
+				this.jobManagerRef = AkkaUtils.getActorRef(
+						getJobManagerAkkaURL(),
+						actorSystem,
+						deadline.timeLeft());
+
+				return jobManagerRef;
+			}
+			catch (Throwable ignored) {
+				// Retry
+				Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+			}
+		}
+
+		throw new IllegalStateException("JobManager did not start up within " + timeout + ".");
+	}
+
+	/**
+	 * Entry point for the JobManager process.
+	 */
+	public static class JobManagerProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);
+
+		/**
+		 * Runs the JobManager process in {@link JobManagerMode#CLUSTER} and {@link
+		 * StreamingMode#STREAMING} (can handle both batch and streaming jobs).
+		 *
+		 * <p><strong>Required argument</strong>: <code>port</code>. Start the process with
+		 * <code>--port PORT</code>.
+		 *
+		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+		 * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum
+		 * "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) {
+			try {
+				ParameterTool params = ParameterTool.fromArgs(args);
+				final int port = Integer.valueOf(params.getRequired("port"));
+				LOG.info("Running on port {}.", port);
+
+				Configuration config = params.getConfiguration();
+				LOG.info("Configuration: {}.", config);
+
+				// Run the JobManager
+				JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.STREAMING,
+						"localhost", port);
+
+				// Run forever. Forever, ever? Forever, ever!
+				new CountDownLatch(1).await();
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start JobManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
new file mode 100644
index 0000000..f683c55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link TaskManager} instance running in a separate JVM.
+ */
+public class TaskManagerProcess extends TestJvmProcess {
+
+	/** ID for this TaskManager */
+	private final int id;
+
+	/** The configuration for the TaskManager */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */
+	private final String[] jvmArgs;
+
+	public TaskManagerProcess(int id, Configuration config) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+
+		ArrayList<String> args = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "TaskManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return TaskManagerProcessEntryPoint.class.getName();
+	}
+
+	public int getId() {
+		return id;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("TaskManagerProcess(id=%d)", id);
+	}
+
+	/**
+	 * Entry point for the TaskManager process.
+	 */
+	public static class TaskManagerProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+		/**
+		 * Runs the JobManager process in {@link StreamingMode#STREAMING} (can handle both batch
+		 * and streaming jobs).
+		 *
+		 * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
+		 * for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) throws Exception {
+			try {
+				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
+
+				if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) {
+					config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				}
+
+				if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) {
+					config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				}
+
+
+				LOG.info("Configuration: {}.", config);
+
+				// Run the TaskManager
+				TaskManager.selectNetworkInterfaceAndRunTaskManager(
+						config, StreamingMode.STREAMING, TaskManager.class);
+
+				// Run forever
+				new CountDownLatch(1).await();
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
new file mode 100644
index 0000000..0920b5c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.fail;
+
+/**
+ * A {@link Process} running a separate JVM.
+ */
+public abstract class TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TestJvmProcess.class);
+
+	/** Lock to guard {@link #createAndStart()} and {@link #destroy()} calls. */
+	private final Object createDestroyLock = new Object();
+
+	/** The java command path */
+	private final String javaCommandPath;
+
+	/** The log4j configuration path. */
+	private final String log4jConfigFilePath;
+
+	/** Shutdown hook for resource cleanup */
+	private final Thread shutdownHook;
+
+	/** JVM process memory (set for both '-Xms' and '-Xmx'). */
+	private int jvmMemoryInMb = 80;
+
+	/** The JVM process */
+	private Process process;
+
+	/** Writer for the process output */
+	private volatile StringWriter processOutput;
+
+	public TestJvmProcess() throws Exception {
+		this(getJavaCommandPath(), createTemporaryLog4JProperties().getPath());
+	}
+
+	public TestJvmProcess(String javaCommandPath, String log4jConfigFilePath) {
+		this.javaCommandPath = checkNotNull(javaCommandPath, "Java command path");
+		this.log4jConfigFilePath = checkNotNull(log4jConfigFilePath, "log4j config file path");
+
+		this.shutdownHook = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					destroy();
+				}
+				catch (Throwable t) {
+					LOG.error("Error during process cleanup shutdown hook.", t);
+				}
+			}
+		});
+	}
+
+	/**
+	 * Returns the name of the process.
+	 */
+	public abstract String getName();
+
+	/**
+	 * Returns the arguments to the JVM.
+	 *
+	 * <p>These can be parsed by the main method of the entry point class.
+	 */
+	public abstract String[] getJvmArgs();
+
+	/**
+	 * Returns the name of the class to run.
+	 *
+	 * <p>Arguments to the main method can be specified via {@link #getJvmArgs()}.
+	 */
+	public abstract String getEntryPointClassName();
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the memory for the process (<code>-Xms</code> and <code>-Xmx</code> flags) (>= 80).
+	 *
+	 * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 80).
+	 */
+	public void setJVMMemory(int jvmMemoryInMb) {
+		checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at least 80 MBs of memory.");
+		this.jvmMemoryInMb = jvmMemoryInMb;
+	}
+
+	/**
+	 * Creates and starts the {@link Process}.
+	 *
+	 * <strong>Important:</strong> Don't forget to call {@link #destroy()} to prevent
+	 * resource leaks. The created process will be child process and is not guaranteed to
+	 * terminate when the parent process terminates.
+	 */
+	public void createAndStart() throws IOException {
+		String[] cmd = new String[] {
+				javaCommandPath,
+				"-Dlog.level=DEBUG",
+				"-Dlog4j.configuration=file:" + log4jConfigFilePath,
+				"-Xms" + jvmMemoryInMb + "m",
+				"-Xmx" + jvmMemoryInMb + "m",
+				"-classpath", getCurrentClasspath(),
+				getEntryPointClassName() };
+
+		String[] jvmArgs = getJvmArgs();
+
+		if (jvmArgs != null && jvmArgs.length > 0) {
+			cmd = ArrayUtils.addAll(cmd, jvmArgs);
+		}
+
+		synchronized (createDestroyLock) {
+			if (process == null) {
+				LOG.debug("Running command '{}'.", Arrays.toString(cmd));
+				this.process = new ProcessBuilder(cmd).start();
+
+				// Forward output
+				this.processOutput = new StringWriter();
+				new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+				try {
+					// Add JVM shutdown hook to call shutdown of service
+					Runtime.getRuntime().addShutdownHook(shutdownHook);
+				}
+				catch (IllegalStateException ignored) {
+					// JVM is already shutting down. No need to do this.
+				}
+				catch (Throwable t) {
+					LOG.error("Cannot register process cleanup shutdown hook.", t);
+				}
+			}
+			else {
+				throw new IllegalStateException("Already running.");
+			}
+		}
+	}
+
+	public void printProcessLog() {
+		if (processOutput == null) {
+			throw new IllegalStateException("Not started");
+		}
+
+		System.out.println("-----------------------------------------");
+		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + getName());
+		System.out.println("-----------------------------------------");
+
+		String out = processOutput.toString();
+		if (out == null || out.length() == 0) {
+			System.out.println("(EMPTY)");
+		}
+		else {
+			System.out.println(out);
+		}
+
+		System.out.println("-----------------------------------------");
+		System.out.println("		END SPAWNED PROCESS LOG " + getName());
+		System.out.println("-----------------------------------------");
+	}
+
+	public void destroy() {
+		synchronized (createDestroyLock) {
+			if (process != null) {
+				LOG.debug("Destroying " + getName() + " process.");
+
+				try {
+					process.destroy();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while trying to destroy process.", t);
+				}
+				finally {
+					process = null;
+
+					if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+						try {
+							Runtime.getRuntime().removeShutdownHook(shutdownHook);
+						}
+						catch (IllegalStateException ignored) {
+							// JVM is in shutdown already, we can safely ignore this.
+						}
+						catch (Throwable t) {
+							LOG.warn("Exception while unregistering prcess cleanup shutdown hook.");
+						}
+					}
+				}
+			}
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// File based synchronization utilities
+	// ---------------------------------------------------------------------------------------------
+
+	public static void touchFile(File file) throws IOException {
+		if (!file.exists()) {
+			new FileOutputStream(file).close();
+		}
+		if (!file.setLastModified(System.currentTimeMillis())) {
+			throw new IOException("Could not touch the file.");
+		}
+	}
+
+	public static void waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
+		long now = System.currentTimeMillis();
+		final long deadline = now + timeout;
+
+
+		while (now < deadline) {
+			boolean allFound = true;
+
+			for (int i = 0; i < num; i++) {
+				File nextToCheck = new File(basedir, prefix + i);
+				if (!nextToCheck.exists()) {
+					allFound = false;
+					break;
+				}
+			}
+
+			if (allFound) {
+				return;
+			}
+			else {
+				// not all found, wait for a bit
+				try {
+					Thread.sleep(10);
+				}
+				catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+
+				now = System.currentTimeMillis();
+			}
+		}
+
+		fail("The tasks were not started within time (" + timeout + "msecs)");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
new file mode 100644
index 0000000..d2e5b6a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper test utilities.
+ */
+public class ZooKeeperTestUtils {
+
+	/**
+	 * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 *
+	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
+	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
+	 *                          recovery)
+	 * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 */
+	public static Configuration createZooKeeperRecoveryModeConfig(
+			String zooKeeperQuorum, String fsStateHandlePath) {
+
+		return setZooKeeperRecoveryMode(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
+	}
+
+	/**
+	 * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 *
+	 * @param config            Configuration to use
+	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
+	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
+	 *                          recovery)
+	 * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 */
+	public static Configuration setZooKeeperRecoveryMode(
+			Configuration config,
+			String zooKeeperQuorum,
+			String fsStateHandlePath) {
+
+		checkNotNull(config, "Configuration");
+		checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
+		checkNotNull(fsStateHandlePath, "File state handle backend path");
+
+		// Web frontend, you have been dismissed. Sorry.
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+
+		// ZooKeeper recovery mode
+		config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+
+		int connTimeout = 5000;
+		if (System.getenv().get("CI") != null) {
+			// The regular timeout is to aggressive for Travis and connections are often lost.
+			connTimeout = 20000;
+		}
+
+		config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+		config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+
+		// File system state backend
+		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints");
+		config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
+
+		// Akka failure detection and execution retries
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
+		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+		return config;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
new file mode 100644
index 0000000..f0130ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreITCase extends TestLogger {
+
+	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (ZooKeeper != null) {
+			ZooKeeper.shutdown();
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+	}
+
+	/**
+	 * Tests add operation with default {@link CreateMode}.
+	 */
+	@Test
+	public void testAdd() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testAdd";
+		final Long state = 1239712317L;
+
+		// Test
+		store.add(pathInZooKeeper, state);
+
+		// Verify
+		// State handle created
+		assertEquals(1, stateHandleProvider.getStateHandles().size());
+		assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+
+		// Path created and is persistent
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+		assertNotNull(stat);
+		assertEquals(0, stat.getEphemeralOwner());
+
+		// Data is equal
+		@SuppressWarnings("unchecked")
+		Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).getState(null);
+
+		assertEquals(state, actual);
+	}
+
+	/**
+	 * Tests that {@link CreateMode} is respected.
+	 */
+	@Test
+	public void testAddWithCreateMode() throws Exception {
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		Long state = 3457347234L;
+
+		CreateMode[] modes = CreateMode.values();
+		for (int i = 0; i < modes.length; i++) {
+			CreateMode mode = modes[i];
+			state += i;
+
+			String pathInZooKeeper = "/testAddWithCreateMode" + mode.name();
+
+			// Test
+			store.add(pathInZooKeeper, state, mode);
+
+			if (mode.isSequential()) {
+				// Figure out the sequential ID
+				List<String> paths = ZooKeeper.getClient().getChildren().forPath("/");
+				for (String p : paths) {
+					if (p.startsWith("testAddWithCreateMode" + mode.name())) {
+						pathInZooKeeper = "/" + p;
+						break;
+					}
+				}
+			}
+
+			// Verify
+			// State handle created
+			assertEquals(i + 1, stateHandleProvider.getStateHandles().size());
+			assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null));
+
+			// Path created
+			Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+
+			assertNotNull(stat);
+
+			// Is ephemeral or persistent
+			if (mode.isEphemeral()) {
+				assertTrue(stat.getEphemeralOwner() != 0);
+			}
+			else {
+				assertEquals(0, stat.getEphemeralOwner());
+			}
+
+			// Data is equal
+			@SuppressWarnings("unchecked")
+			Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+					ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+					ClassLoader.getSystemClassLoader())).getState(null);
+
+			assertEquals(state, actual);
+		}
+	}
+
+	/**
+	 * Tests that an existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testAddAlreadyExistingPath() throws Exception {
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+		store.add("/testAddAlreadyExistingPath", 1L);
+	}
+
+	/**
+	 * Tests that the created state handle is discarded if ZooKeeper create fails.
+	 */
+	@Test
+	public void testAddDiscardStateHandleAfterFailure() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		CuratorFramework client = spy(ZooKeeper.getClient());
+		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				client, stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
+		final Long state = 81282227L;
+
+		try {
+			// Test
+			store.add(pathInZooKeeper, state);
+			fail("Did not throw expected exception");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify
+		// State handle created and discarded
+		assertEquals(1, stateHandleProvider.getStateHandles().size());
+		assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+		assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+	}
+
+	/**
+	 * Tests that a state handle is replaced.
+	 */
+	@Test
+	public void testReplace() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testReplace";
+		final Long initialState = 30968470898L;
+		final Long replaceState = 88383776661L;
+
+		// Test
+		store.add(pathInZooKeeper, initialState);
+		store.replace(pathInZooKeeper, 0, replaceState);
+
+		// Verify
+		// State handles created
+		assertEquals(2, stateHandleProvider.getStateHandles().size());
+		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null));
+		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null));
+
+		// Path created and is persistent
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+		assertNotNull(stat);
+		assertEquals(0, stat.getEphemeralOwner());
+
+		// Data is equal
+		@SuppressWarnings("unchecked")
+		Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).getState(null);
+
+		assertEquals(replaceState, actual);
+	}
+
+	/**
+	 * Tests that a non existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testReplaceNonExistingPath() throws Exception {
+		StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		store.replace("/testReplaceNonExistingPath", 0, 1L);
+	}
+
+	/**
+	 * Tests that the replace state handle is discarded if ZooKeeper setData fails.
+	 */
+	@Test
+	public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		CuratorFramework client = spy(ZooKeeper.getClient());
+		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				client, stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
+		final Long initialState = 30968470898L;
+		final Long replaceState = 88383776661L;
+
+		// Test
+		store.add(pathInZooKeeper, initialState);
+
+		try {
+			store.replace(pathInZooKeeper, 0, replaceState);
+			fail("Did not throw expected exception");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify
+		// State handle created and discarded
+		assertEquals(2, stateHandleProvider.getStateHandles().size());
+		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null));
+		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null));
+		assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+		// Initial value
+		@SuppressWarnings("unchecked")
+		Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).getState(null);
+
+		assertEquals(initialState, actual);
+	}
+
+	/**
+	 * Tests get operation.
+	 */
+	@Test
+	public void testGetAndExists() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testGetAndExists";
+		final Long state = 311222268470898L;
+
+		// Test
+		assertEquals(-1, store.exists(pathInZooKeeper));
+
+		store.add(pathInZooKeeper, state);
+		StateHandle<Long> actual = store.get(pathInZooKeeper);
+
+		// Verify
+		assertEquals(state, actual.getState(null));
+		assertTrue(store.exists(pathInZooKeeper) >= 0);
+	}
+
+	/**
+	 * Tests that a non existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testGetNonExistingPath() throws Exception {
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		store.get("/testGetNonExistingPath");
+	}
+
+	/**
+	 * Tests that all added state is returned.
+	 */
+	@Test
+	public void testGetAll() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testGetAll";
+
+		final Set<Long> expected = new HashSet<>();
+		expected.add(311222268470898L);
+		expected.add(132812888L);
+		expected.add(27255442L);
+		expected.add(11122233124L);
+
+		// Test
+		for (long val : expected) {
+			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+		}
+
+		for (Tuple2<StateHandle<Long>, String> val : store.getAll()) {
+			assertTrue(expected.remove(val.f0.getState(null)));
+		}
+		assertEquals(0, expected.size());
+	}
+
+	/**
+	 * Tests that the state is returned sorted.
+	 */
+	@Test
+	public void testGetAllSortedByName() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testGetAllSortedByName";
+
+		final Long[] expected = new Long[] {
+				311222268470898L, 132812888L, 27255442L, 11122233124L };
+
+		// Test
+		for (long val : expected) {
+			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+		}
+
+		List<Tuple2<StateHandle<Long>, String>> actual = store.getAllSortedByName();
+		assertEquals(expected.length, actual.size());
+
+		for (int i = 0; i < expected.length; i++) {
+			assertEquals(expected[i], actual.get(i).f0.getState(null));
+		}
+	}
+
+	/**
+	 * Tests that state handles are correctly removed.
+	 */
+	@Test
+	public void testRemove() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testRemove";
+		final Long state = 27255442L;
+
+		store.add(pathInZooKeeper, state);
+
+		// Test
+		store.remove(pathInZooKeeper);
+
+		// Verify discarded
+		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+	}
+
+	/**
+	 * Tests that state handles are correctly removed with a callback.
+	 */
+	@Test
+	public void testRemoveWithCallback() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testRemoveWithCallback";
+		final Long state = 27255442L;
+
+		store.add(pathInZooKeeper, state);
+
+		final CountDownLatch sync = new CountDownLatch(1);
+		BackgroundCallback callback = mock(BackgroundCallback.class);
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				sync.countDown();
+				return null;
+			}
+		}).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
+
+		// Test
+		store.remove(pathInZooKeeper, callback);
+
+		// Verify discarded and callback called
+		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+
+		sync.await();
+
+		verify(callback, times(1))
+				.processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
+	}
+
+	/**
+	 * Tests that state handles are correctly discarded.
+	 */
+	@Test
+	public void testRemoveAndDiscardState() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testDiscard";
+		final Long state = 27255442L;
+
+		store.add(pathInZooKeeper, state);
+
+		// Test
+		store.removeAndDiscardState(pathInZooKeeper);
+
+		// Verify discarded
+		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+	}
+
+	/** Tests that all state handles are correctly discarded. */
+	@Test
+	public void testRemoveAndDiscardAllState() throws Exception {
+		// Setup
+		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZooKeeper.getClient(), stateHandleProvider);
+
+		// Config
+		final String pathInZooKeeper = "/testDiscardAll";
+
+		final Set<Long> expected = new HashSet<>();
+		expected.add(311222268470898L);
+		expected.add(132812888L);
+		expected.add(27255442L);
+		expected.add(11122233124L);
+
+		// Test
+		for (long val : expected) {
+			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+		}
+
+		store.removeAndDiscardAllState();
+
+		// Verify all discarded
+		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Simple test helpers
+	// ---------------------------------------------------------------------------------------------
+
+	private static class LongStateHandleProvider implements StateHandleProvider<Long> {
+
+		private static final long serialVersionUID = 4572084854499402276L;
+
+		private final List<LongStateHandle> stateHandles = new ArrayList<>();
+
+		@Override
+		public StateHandle<Long> createStateHandle(Long state) {
+			LongStateHandle stateHandle = new LongStateHandle(state);
+			stateHandles.add(stateHandle);
+
+			return stateHandle;
+		}
+
+		public List<LongStateHandle> getStateHandles() {
+			return stateHandles;
+		}
+	}
+
+	private static class LongStateHandle implements StateHandle<Long> {
+
+		private static final long serialVersionUID = -3555329254423838912L;
+
+		private final Long state;
+
+		private int numberOfDiscardCalls;
+
+		public LongStateHandle(Long state) {
+			this.state = state;
+		}
+
+		@Override
+		public Long getState(ClassLoader ignored) throws Exception {
+			return state;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			numberOfDiscardCalls++;
+		}
+
+		public int getNumberOfDiscardCalls() {
+			return numberOfDiscardCalls;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
new file mode 100644
index 0000000..7ae89d1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * Simple ZooKeeper and CuratorFramework setup for tests.
+ */
+public class ZooKeeperTestEnvironment {
+
+	private final TestingServer zooKeeperServer;
+
+	private final TestingCluster zooKeeperCluster;
+
+	private final CuratorFramework client;
+
+	/**
+	 * Starts a ZooKeeper cluster with the number of quorum peers and a client.
+	 *
+	 * @param numberOfZooKeeperQuorumPeers Starts a {@link TestingServer}, if <code>1</code>.
+	 *                                     Starts a {@link TestingCluster}, if <code>=>1</code>.
+	 */
+	public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) {
+		if (numberOfZooKeeperQuorumPeers <= 0) {
+			throw new IllegalArgumentException("Number of peers needs to be >= 1.");
+		}
+
+		final Configuration conf = new Configuration();
+
+		try {
+			if (numberOfZooKeeperQuorumPeers == 1) {
+				zooKeeperServer = new TestingServer(true);
+				zooKeeperCluster = null;
+
+				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+						zooKeeperServer.getConnectString());
+			}
+			else {
+				zooKeeperServer = null;
+				zooKeeperCluster = new TestingCluster(numberOfZooKeeperQuorumPeers);
+
+				zooKeeperCluster.start();
+
+				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+						zooKeeperCluster.getConnectString());
+			}
+
+			client = ZooKeeperUtils.startCuratorFramework(conf);
+
+			client.newNamespaceAwareEnsurePath("/")
+					.ensure(client.getZookeeperClient());
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Error setting up ZooKeeperTestEnvironment", e);
+		}
+	}
+
+	/**
+	 * Shutdown the client and ZooKeeper server/cluster.
+	 */
+	public void shutdown() throws Exception {
+		if (client != null) {
+			client.close();
+		}
+
+		if (zooKeeperServer != null) {
+			zooKeeperServer.close();
+		}
+
+		if (zooKeeperCluster != null) {
+			zooKeeperCluster.close();
+		}
+	}
+
+	public String getConnectString() {
+		if (zooKeeperServer != null) {
+			return zooKeeperServer.getConnectString();
+		}
+		else {
+			return zooKeeperCluster.getConnectString();
+		}
+	}
+
+	/**
+	 * Returns a client for the started ZooKeeper server/cluster.
+	 */
+	public CuratorFramework getClient() {
+		return client;
+	}
+
+	/**
+	 * Creates a new client for the started ZooKeeper server/cluster.
+	 */
+	public CuratorFramework createClient() {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString());
+		return ZooKeeperUtils.startCuratorFramework(config);
+	}
+
+	/**
+	 * Deletes all ZNodes under the root node.
+	 *
+	 * @throws Exception If the ZooKeeper operation fails
+	 */
+	public void deleteAll() throws Exception {
+		final String path = "/" + client.getNamespace();
+		ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, false);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index d1b8fac..9a1cde0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.executiongraph
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
@@ -32,6 +35,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
 
 @RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
@@ -126,8 +130,23 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         for (vertex <- eg.getAllExecutionVertices.asScala) {
           vertex.getCurrentExecutionAttempt().cancelingComplete()
         }
-        
+
+        val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
+
+        // Wait for async restart
+        var deadline = timeout.fromNow
+        while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
+          Thread.sleep(100)
+        }
+
         eg.getState should equal(JobStatus.RUNNING)
+
+        // Wait for deploying after async restart
+        deadline = timeout.fromNow
+        while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
+          _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) {
+          Thread.sleep(100)
+        }
         
         for (vertex <- eg.getAllExecutionVertices.asScala) {
           vertex.getCurrentExecutionAttempt().markFinished()

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c9ae1e4..703d7bf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -43,28 +43,28 @@ import scala.concurrent.{Await, Future}
  *                          otherwise false
  */
 class TestingCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean,
-    synchronousDispatcher: Boolean,
-    streamingMode: StreamingMode)
+                      userConfiguration: Configuration,
+                      singleActorSystem: Boolean,
+                      synchronousDispatcher: Boolean,
+                      streamingMode: StreamingMode)
   extends FlinkMiniCluster(
     userConfiguration,
     singleActorSystem,
     streamingMode) {
-  
+
 
   def this(userConfiguration: Configuration,
            singleActorSystem: Boolean,
            synchronousDispatcher: Boolean)
-       = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
+  = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
 
   def this(userConfiguration: Configuration, singleActorSystem: Boolean)
-       = this(userConfiguration, singleActorSystem, false)
+  = this(userConfiguration, singleActorSystem, false)
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true, false)
-  
+
   // --------------------------------------------------------------------------
-  
+
   override def generateConfiguration(userConfig: Configuration): Configuration = {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -100,16 +100,18 @@ class TestingCluster(
     }
 
     val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = JobManager.createJobManagerComponents(
-        config,
-        createLeaderElectionService())
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    executionRetries,
+    delayBetweenRetries,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobsGraphs,
+    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+      config,
+      createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
     val archive = actorSystem.actorOf(testArchiveProps, archiveName)
@@ -126,7 +128,9 @@ class TestingCluster(
         delayBetweenRetries,
         timeout,
         streamingMode,
-        leaderElectionService))
+        leaderElectionService,
+        submittedJobsGraphs,
+        checkpointRecoveryFactory))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 312a1e5..be72003 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,32 +18,18 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Cancellable, Terminated, ActorRef}
-import akka.pattern.pipe
-import akka.pattern.ask
-import org.apache.flink.api.common.JobID
+import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.{StreamingMode, FlinkActor}
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
-DisableDisconnect}
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
-
 import scala.language.postfixOps
 
 /** JobManager implementation extended by testing messages
@@ -70,7 +56,9 @@ class TestingJobManager(
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
     mode: StreamingMode,
-    leaderElectionService: LeaderElectionService)
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
     executionContext,
@@ -82,5 +70,7 @@ class TestingJobManager(
     delayBetweenRetries,
     timeout,
     mode,
-    leaderElectionService)
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b607433..72a8c25 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -541,7 +542,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 			}
 		}
-	}
 
 	/**
 	 * Registers a timer.

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c7f7698..11eb174 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -20,22 +20,20 @@ package org.apache.flink.test.util
 
 import java.util.concurrent.TimeoutException
 
-import akka.pattern.ask
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
 import akka.pattern.Patterns._
+import akka.pattern.ask
 import org.apache.curator.test.TestingCluster
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.{RecoveryMode, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages
-.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager,
-TestingJobManager, TestingMemoryArchivist}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils}
 
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{Await, Future}
 
 /**
  * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
@@ -47,20 +45,20 @@ import scala.concurrent.{Future, Await}
  *                          same [[ActorSystem]], otherwise false.
  */
 class ForkableFlinkMiniCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean,
-    streamingMode: StreamingMode)
+                                userConfiguration: Configuration,
+                                singleActorSystem: Boolean,
+                                streamingMode: StreamingMode)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
-       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+  = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
-  
+
   // --------------------------------------------------------------------------
 
   var zookeeperCluster: Option[TestingCluster] = None
-  
+
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val forNumberString = System.getProperty("forkNumber")
 
@@ -264,10 +262,10 @@ object ForkableFlinkMiniCluster {
   import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
 
   def startCluster(
-      numSlots: Int,
-      numTaskManagers: Int,
-      timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
-    : ForkableFlinkMiniCluster = {
+                    numSlots: Int,
+                    numTaskManagers: Int,
+                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
+  : ForkableFlinkMiniCluster = {
 
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)