You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:55 UTC

[04/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1330b66..f6ee5c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -34,13 +33,25 @@ import org.junit.Test;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests for the checkpoint coordinator.
  */
 public class CheckpointCoordinatorTest {
-	
+
 	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-	
+
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
 		try {
@@ -50,7 +61,7 @@ public class CheckpointCoordinatorTest {
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			ExecutionVertex triggerVertex1 = mock(ExecutionVertex.class);
 			ExecutionVertex triggerVertex2 = mock(ExecutionVertex.class);
-			
+
 			// create some mock Execution vertices that need to ack the checkpoint
 			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
 			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
@@ -59,10 +70,12 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 1, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {}, cl );
+					new ExecutionVertex[] {}, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -103,10 +116,12 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 1, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {}, cl );
+					new ExecutionVertex[] {}, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -138,17 +153,18 @@ public class CheckpointCoordinatorTest {
 			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
 			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
 			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
-			
+
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			ExecutionVertex ackVertex1 = mock(ExecutionVertex.class);
 			ExecutionVertex ackVertex2 = mock(ExecutionVertex.class);
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 1, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {}, cl );
+					new ExecutionVertex[] {}, cl, new StandaloneCheckpointIDCounter(), new
+					StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -168,13 +184,13 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTriggerAndConfirmSimpleCheckpoint() {
 		try {
 			final JobID jid = new JobID();
 			final long timestamp = System.currentTimeMillis();
-			
+
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
 			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
@@ -183,24 +199,26 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 1, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 }, cl);
-			
+					new ExecutionVertex[] { vertex1, vertex2 }, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			
+
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp));
-			
+
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			
+
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
-			
+
 			assertNotNull(checkpoint);
 			assertEquals(checkpointId, checkpoint.getCheckpointId());
 			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
@@ -210,7 +228,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, checkpoint.getCollectedStates().size());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
-			
+
 			// check that the vertices received the trigger checkpoint message
 			{
 				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
@@ -218,7 +236,7 @@ public class CheckpointCoordinatorTest {
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 			}
-			
+
 			// acknowledge from one of the tasks
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
@@ -233,15 +251,15 @@ public class CheckpointCoordinatorTest {
 
 			// acknowledge the other task.
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
-			
+
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
 			assertTrue(checkpoint.isDiscarded());
-			
+
 			// the now we should have a completed checkpoint
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			
+
 			// validate that the relevant tasks got a confirmation message
 			{
 				NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
@@ -249,13 +267,13 @@ public class CheckpointCoordinatorTest {
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
-			
-			SuccessfulCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+
+			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
 			assertEquals(jid, success.getJobId());
 			assertEquals(timestamp, success.getTimestamp());
 			assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
 			assertTrue(success.getStates().isEmpty());
-			
+
 			// ---------------
 			// trigger another checkpoint and see that this one replaces the other checkpoint
 			// ---------------
@@ -265,11 +283,11 @@ public class CheckpointCoordinatorTest {
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
-			
+
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			
-			SuccessfulCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+
+			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
 			assertEquals(jid, successNew.getJobId());
 			assertEquals(timestampNew, successNew.getTimestamp());
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -295,8 +313,7 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
-	
+
 	@Test
 	public void testMultipleConcurrentCheckpoints() {
 		try {
@@ -305,7 +322,7 @@ public class CheckpointCoordinatorTest {
 			final long timestamp2 = timestamp1 + 8617;
 
 			// create some mock execution vertices
-			
+
 			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
 			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
 
@@ -314,23 +331,25 @@ public class CheckpointCoordinatorTest {
 			final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
 
 			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
-			
+
 			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
 			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
 
 			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
 			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
 			ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
-			
+
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
-			
+
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 2, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex }, cl);
-			
+					new ExecutionVertex[] { commitVertex }, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
@@ -339,7 +358,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			
+
 			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
 			long checkpointId1 = pending1.getCheckpointId();
 
@@ -348,10 +367,10 @@ public class CheckpointCoordinatorTest {
 					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
-			
+
 			// acknowledge one of the three tasks
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
-			
+
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp2));
@@ -373,23 +392,23 @@ public class CheckpointCoordinatorTest {
 					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
-			
+
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
-			
+
 			// now, the first checkpoint should be confirmed
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertTrue(pending1.isDiscarded());
-			
+
 			// the first confirm message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
-			
+
 			// send the last remaining ack for the second checkpoint
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
 
@@ -401,17 +420,17 @@ public class CheckpointCoordinatorTest {
 			// the second commit message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
-			
+
 			// validate the committed checkpoints
-			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
-			
-			SuccessfulCheckpoint sc1 = scs.get(0);
+			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+
+			CompletedCheckpoint sc1 = scs.get(0);
 			assertEquals(checkpointId1, sc1.getCheckpointID());
 			assertEquals(timestamp1, sc1.getTimestamp());
 			assertEquals(jid, sc1.getJobId());
 			assertTrue(sc1.getStates().isEmpty());
-			
-			SuccessfulCheckpoint sc2 = scs.get(1);
+
+			CompletedCheckpoint sc2 = scs.get(1);
 			assertEquals(checkpointId2, sc2.getCheckpointID());
 			assertEquals(timestamp2, sc2.getTimestamp());
 			assertEquals(jid, sc2.getJobId());
@@ -453,10 +472,12 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 10, 600000,
+					jid, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex }, cl);
+					new ExecutionVertex[] { commitVertex }, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(10, cl), RecoveryMode.STANDALONE);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -513,13 +534,13 @@ public class CheckpointCoordinatorTest {
 			// into a successful checkpoint
 			assertTrue(pending1.isDiscarded());
 			assertTrue(pending2.isDiscarded());
-			
+
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// validate the committed checkpoints
-			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
-			SuccessfulCheckpoint success = scs.get(0);
+			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			CompletedCheckpoint success = scs.get(0);
 			assertEquals(checkpointId2, success.getCheckpointID());
 			assertEquals(timestamp2, success.getTimestamp());
 			assertEquals(jid, success.getJobId());
@@ -531,7 +552,7 @@ public class CheckpointCoordinatorTest {
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
-			
+
 			coord.shutdown();
 		}
 		catch (Exception e) {
@@ -539,8 +560,7 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
-	
+
 	@Test
 	public void testCheckpointTimeoutIsolated() {
 		try {
@@ -565,22 +585,24 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator
 			// the timeout for the checkpoint is a 200 milliseconds
-			
+
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 2, 200,
+					jid, 200,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex }, cl);
-			
+					new ExecutionVertex[] { commitVertex }, cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp));
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			
+
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
-			
+
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
-			
+
 			// wait until the checkpoint must have expired.
 			// we check every 250 msecs conservatively for 5 seconds
 			// to give even slow build servers a very good chance of completing this
@@ -591,7 +613,7 @@ public class CheckpointCoordinatorTest {
 			while (!checkpoint.isDiscarded() &&
 					coord.getNumberOfPendingCheckpoints() > 0 &&
 					System.currentTimeMillis() < deadline);
-			
+
 			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -599,7 +621,7 @@ public class CheckpointCoordinatorTest {
 			// no confirm message must have been sent
 			verify(commitVertex, times(0))
 					.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
-			
+
 			coord.shutdown();
 		}
 		catch (Exception e) {
@@ -607,7 +629,7 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void handleMessagesForNonExistingCheckpoints() {
 		try {
@@ -625,27 +647,28 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
 			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
-			
+
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 2, 200000,
+					jid, 200000,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex }, cl);
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
-			
+
 			long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
-			
+
 			// send some messages that do not belong to either the job or the any
 			// of the vertices that need to be acknowledged.
 			// non of the messages should throw an exception
-			
+
 			// wrong job id
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
-			
+
 			// unknown checkpoint
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
-			
+
 			// unknown ack vertex
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
 
@@ -660,15 +683,16 @@ public class CheckpointCoordinatorTest {
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
 	}
-	
+
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
 		final Execution exec = mock(Execution.class);
 		when(exec.getAttemptId()).thenReturn(attemptID);
 		when(exec.getState()).thenReturn(state);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
 		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-		
+
 		return vertex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
new file mode 100644
index 0000000..96c4eea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class CheckpointIDCounterTest extends TestLogger {
+
+	protected abstract CheckpointIDCounter createCompletedCheckpoints() throws Exception;
+
+	public static class StandaloneCheckpointIDCounterTest extends CheckpointIDCounterTest {
+
+		@Override
+		protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
+			return new StandaloneCheckpointIDCounter();
+		}
+	}
+
+	public static class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTest {
+
+		private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+		@AfterClass
+		public static void tearDown() throws Exception {
+			if (ZooKeeper != null) {
+				ZooKeeper.shutdown();
+			}
+		}
+
+		@Before
+		public void cleanUp() throws Exception {
+			ZooKeeper.deleteAll();
+		}
+
+		@Override
+		protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
+			return new ZooKeeperCheckpointIDCounter(ZooKeeper.getClient(),
+					"/checkpoint-id-counter");
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests serial increment and get calls.
+	 */
+	@Test
+	public void testSerialIncrementAndGet() throws Exception {
+		final CheckpointIDCounter counter = createCompletedCheckpoints();
+
+		try {
+			counter.start();
+
+			assertEquals(1, counter.getAndIncrement());
+			assertEquals(2, counter.getAndIncrement());
+			assertEquals(3, counter.getAndIncrement());
+			assertEquals(4, counter.getAndIncrement());
+		}
+		finally {
+			counter.stop();
+		}
+	}
+
+	/**
+	 * Tests concurrent increment and get calls from multiple Threads and verifies that the numbers
+	 * counts strictly increasing.
+	 */
+	@Test
+	public void testConcurrentGetAndIncrement() throws Exception {
+		// Config
+		final int numThreads = 8;
+
+		// Setup
+		final CountDownLatch startLatch = new CountDownLatch(1);
+		final CheckpointIDCounter counter = createCompletedCheckpoints();
+		counter.start();
+
+		ExecutorService executor = null;
+		try {
+			executor = Executors.newFixedThreadPool(numThreads);
+
+			List<Future<List<Long>>> resultFutures = new ArrayList<>(numThreads);
+
+			for (int i = 0; i < numThreads; i++) {
+				resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
+			}
+
+			// Kick off the incrementing
+			startLatch.countDown();
+
+			final int expectedTotal = numThreads * Incrementer.NumIncrements;
+
+			List<Long> all = new ArrayList<>(expectedTotal);
+
+			// Get the counts
+			for (Future<List<Long>> result : resultFutures) {
+				List<Long> counts = result.get();
+
+				for (long val : counts) {
+					all.add(val);
+				}
+			}
+
+			// Verify
+			Collections.sort(all);
+
+			assertEquals(expectedTotal, all.size());
+
+			long current = 0;
+			for (long val : all) {
+				// Incrementing counts
+				assertEquals(++current, val);
+			}
+
+			// The final count
+			assertEquals(expectedTotal + 1, counter.getAndIncrement());
+		}
+		finally {
+			if (executor != null) {
+				executor.shutdown();
+			}
+
+			counter.stop();
+		}
+	}
+
+	/**
+	 * Task repeatedly incrementing the {@link CheckpointIDCounter}.
+	 */
+	private static class Incrementer implements Callable<List<Long>> {
+
+		/** Total number of {@link CheckpointIDCounter#getAndIncrement()} calls. */
+		private final static int NumIncrements = 128;
+
+		private final CountDownLatch startLatch;
+
+		private final CheckpointIDCounter counter;
+
+		public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
+			this.startLatch = startLatch;
+			this.counter = counter;
+		}
+
+		@Override
+		public List<Long> call() throws Exception {
+			final Random rand = new Random();
+			final List<Long> counts = new ArrayList<>();
+
+			// Wait for the main thread to kick off execution
+			this.startLatch.await();
+
+			for (int i = 0; i < NumIncrements; i++) {
+				counts.add(counter.getAndIncrement());
+
+				// To get some "random" interleaving ;)
+				Thread.sleep(rand.nextInt(20));
+			}
+
+			return counts;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 08cb0a3..32c15bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -79,10 +80,12 @@ public class CheckpointStateRestoreTest {
 			map.put(statelessId, stateless);
 			
 			
-			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L, 
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0], cl);
+					new ExecutionVertex[0], cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 			
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -148,10 +151,12 @@ public class CheckpointStateRestoreTest {
 			map.put(statelessId, stateless);
 
 
-			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0], cl);
+					new ExecutionVertex[0], cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -188,10 +193,12 @@ public class CheckpointStateRestoreTest {
 	@Test
 	public void testNoCheckpointAvailable() {
 		try {
-			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
+			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L,
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
-					new ExecutionVertex[0], cl);
+					new ExecutionVertex[0], cl,
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..9e3c605
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.CheckpointMessagesTest;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for basic {@link CompletedCheckpointStore} contract.
+ */
+public abstract class CompletedCheckpointStoreTest extends TestLogger {
+
+	/**
+	 * Creates the {@link CompletedCheckpointStore} implementation to be tested.
+	 */
+	protected abstract CompletedCheckpointStore createCompletedCheckpoints(
+			int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception;
+
+	// ---------------------------------------------------------------------------------------------
+
+	// Verify that discarded checkpoints are called with the correct class loader
+	private final ClassLoader userClassLoader = ClassLoader.getSystemClassLoader();
+
+	/**
+	 * Tests that at least one checkpoint needs to be retained.
+	 */
+	@Test(expected = Exception.class)
+	public void testExceptionOnNoRetainedCheckpoints() throws Exception {
+		createCompletedCheckpoints(0, userClassLoader);
+	}
+
+	/**
+	 * Tests adding and getting a checkpoint.
+	 */
+	@Test
+	public void testAddAndGetLatestCheckpoint() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+		// Empty state
+		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+		assertEquals(0, checkpoints.getAllCheckpoints().size());
+
+		TestCheckpoint[] expected = new TestCheckpoint[] {
+				createCheckpoint(0), createCheckpoint(1) };
+
+		// Add and get latest
+		checkpoints.addCheckpoint(expected[0]);
+		assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+		verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint());
+
+		checkpoints.addCheckpoint(expected[1]);
+		assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints());
+		verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint());
+	}
+
+	/**
+	 * Tests that adding more checkpoints than retained discards the correct checkpoints (using
+	 * the correct class loader).
+	 */
+	@Test
+	public void testAddCheckpointMoreThanMaxRetained() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+
+		TestCheckpoint[] expected = new TestCheckpoint[] {
+				createCheckpoint(0), createCheckpoint(1),
+				createCheckpoint(2), createCheckpoint(3)
+		};
+
+		// Add checkpoints
+		checkpoints.addCheckpoint(expected[0]);
+		assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+
+		for (int i = 1; i < expected.length; i++) {
+			checkpoints.addCheckpoint(expected[i]);
+
+			// The ZooKeeper implementation discards asynchronously
+			expected[i - 1].awaitDiscard();
+			assertTrue(expected[i - 1].isDiscarded());
+			assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader());
+
+			assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+		}
+	}
+
+	/**
+	 * Tests that
+	 * <ul>
+	 * <li>{@link CompletedCheckpointStore#getLatestCheckpoint()} returns <code>null</code>,</li>
+	 * <li>{@link CompletedCheckpointStore#getAllCheckpoints()} returns an empty list,</li>
+	 * <li>{@link CompletedCheckpointStore#getNumberOfRetainedCheckpoints()} returns 0.</li>
+	 * </ul>
+	 */
+	@Test
+	public void testEmptyState() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+
+		assertNull(checkpoints.getLatestCheckpoint());
+		assertEquals(0, checkpoints.getAllCheckpoints().size());
+		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+	}
+
+	/**
+	 * Tests that all added checkpoints are returned.
+	 */
+	@Test
+	public void testGetAllCheckpoints() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+		TestCheckpoint[] expected = new TestCheckpoint[] {
+				createCheckpoint(0), createCheckpoint(1),
+				createCheckpoint(2), createCheckpoint(3)
+		};
+
+		for (TestCheckpoint checkpoint : expected) {
+			checkpoints.addCheckpoint(checkpoint);
+		}
+
+		List<CompletedCheckpoint> actual = checkpoints.getAllCheckpoints();
+
+		assertEquals(expected.length, actual.size());
+
+		for (int i = 0; i < expected.length; i++) {
+			assertEquals(expected[i], actual.get(i));
+		}
+	}
+
+	/**
+	 * Tests that all checkpoints are discarded (using the correct class loader).
+	 */
+	@Test
+	public void testDiscardAllCheckpoints() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+		TestCheckpoint[] expected = new TestCheckpoint[] {
+				createCheckpoint(0), createCheckpoint(1),
+				createCheckpoint(2), createCheckpoint(3)
+		};
+
+		for (TestCheckpoint checkpoint : expected) {
+			checkpoints.addCheckpoint(checkpoint);
+		}
+
+		checkpoints.discardAllCheckpoints();
+
+		// Empty state
+		assertNull(checkpoints.getLatestCheckpoint());
+		assertEquals(0, checkpoints.getAllCheckpoints().size());
+		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+
+		// All have been discarded
+		for (TestCheckpoint checkpoint : expected) {
+			// The ZooKeeper implementation discards asynchronously
+			checkpoint.awaitDiscard();
+			assertTrue(checkpoint.isDiscarded());
+			assertEquals(userClassLoader, checkpoint.getDiscardClassLoader());
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	protected TestCheckpoint createCheckpoint(int id) throws IOException {
+		return createCheckpoint(id, 4);
+	}
+
+	protected TestCheckpoint createCheckpoint(int id, int numberOfStates)
+			throws IOException {
+
+		JobVertexID jvid = new JobVertexID();
+
+		ArrayList<StateForTask> taskStates = new ArrayList<>();
+
+		for (int i = 0; i < numberOfStates; i++) {
+			SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+					new CheckpointMessagesTest.MyHandle());
+
+			taskStates.add(new StateForTask(stateHandle, jvid, i));
+		}
+
+		return new TestCheckpoint(new JobID(), id, 0, taskStates);
+	}
+
+	private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
+		assertEquals(expected.getJobId(), actual.getJobId());
+		assertEquals(expected.getCheckpointID(), actual.getCheckpointID());
+		assertEquals(expected.getTimestamp(), actual.getTimestamp());
+
+		List<StateForTask> expectedStates = expected.getStates();
+		List<StateForTask> actualStates = actual.getStates();
+
+		assertEquals(expectedStates.size(), actualStates.size());
+
+		for (int i = 0; i < expectedStates.size(); i++) {
+			assertEquals(expectedStates.get(i), actualStates.get(i));
+		}
+	}
+
+	/**
+	 * A test {@link CompletedCheckpoint}. We want to verify that the correct class loader is
+	 * used when discarding. Spying on a regular {@link CompletedCheckpoint} instance with
+	 * Mockito doesn't work, because it it breaks serializability.
+	 */
+	protected static class TestCheckpoint extends CompletedCheckpoint {
+
+		private static final long serialVersionUID = 4211419809665983026L;
+
+		private boolean isDiscarded;
+
+		// Latch for test variants which discard asynchronously
+		private transient final CountDownLatch discardLatch = new CountDownLatch(1);
+
+		private transient ClassLoader discardClassLoader;
+
+		public TestCheckpoint(
+				JobID jobId,
+				long checkpointId,
+				long timestamp,
+				ArrayList<StateForTask> states) {
+
+			super(jobId, checkpointId, timestamp, states);
+		}
+
+		@Override
+		public void discard(ClassLoader userClassLoader) {
+			super.discard(userClassLoader);
+
+			if (!isDiscarded) {
+				this.discardClassLoader = userClassLoader;
+				this.isDiscarded = true;
+
+				if (discardLatch != null) {
+					discardLatch.countDown();
+				}
+			}
+		}
+
+		public boolean isDiscarded() {
+			return isDiscarded;
+		}
+
+		public void awaitDiscard() throws InterruptedException {
+			if (discardLatch != null) {
+				discardLatch.await();
+			}
+		}
+
+		public ClassLoader getDiscardClassLoader() {
+			return discardClassLoader;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			TestCheckpoint that = (TestCheckpoint) o;
+
+			return getJobId().equals(that.getJobId())
+					&& getCheckpointID() == that.getCheckpointID();
+		}
+
+		@Override
+		public int hashCode() {
+			return getJobId().hashCode() + (int) getCheckpointID();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..beccbf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Tests for basic {@link CompletedCheckpointStore} contract.
+ */
+public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
+
+	@Override
+	protected CompletedCheckpointStore createCompletedCheckpoints(
+			int maxNumberOfCheckpointsToRetain,
+			ClassLoader userClassLoader) throws Exception {
+
+		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userClassLoader);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
new file mode 100644
index 0000000..4c6ddfd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
+ */
+public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
+
+	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private final static String CheckpointsPath = "/checkpoints";
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (ZooKeeper != null) {
+			ZooKeeper.shutdown();
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+	}
+
+	@Override
+	protected CompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain,
+			ClassLoader userLoader) throws Exception {
+
+		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
+				ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle
+				.LocalStateHandleProvider<CompletedCheckpoint>());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests that older checkpoints are cleaned up at startup.
+	 */
+	@Test
+	public void testRecover() throws Exception {
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3, ClassLoader
+				.getSystemClassLoader());
+
+		TestCheckpoint[] expected = new TestCheckpoint[] {
+				createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
+		};
+
+		// Add multiple checkpoints
+		checkpoints.addCheckpoint(expected[0]);
+		checkpoints.addCheckpoint(expected[1]);
+		checkpoints.addCheckpoint(expected[2]);
+
+		// All three should be in ZK
+		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+
+		// Recover
+		checkpoints.recover();
+
+		// Only the latest one should be in ZK
+		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+
+		// Retry this operation, because removal is asynchronous
+		while (deadline.hasTimeLeft() && ZooKeeper.getClient()
+				.getChildren().forPath(CheckpointsPath).size() != 1) {
+
+			Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+		}
+
+		assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+		assertEquals(expected[2], checkpoints.getLatestCheckpoint());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5a5ef57..fa61acf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -178,6 +178,10 @@ public class BlobLibraryCacheManagerTest {
 
 				// un-register them again
 				libCache.unregisterTask(jid, executionId);
+
+				// Don't fail if called again
+				libCache.unregisterTask(jid, executionId);
+
 				assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
 
 				// library is still cached (but not associated with job any more)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 56e5bde..ca8810b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -25,7 +25,7 @@ import io.netty.channel.ChannelPromise;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Ignore;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
new file mode 100644
index 0000000..ac250bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests recovery of {@link SubmittedJobGraph} instances.
+ */
+public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
+
+	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	private static final File FileStateBackendBasePath;
+
+	static {
+		try {
+			FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error in test setup. Could not create directory.", e);
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		ZooKeeper.shutdown();
+
+		if (FileStateBackendBasePath != null) {
+			FileUtils.deleteDirectory(FileStateBackendBasePath);
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		if (FileStateBackendBasePath != null) {
+			FileUtils.cleanDirectory(FileStateBackendBasePath);
+		}
+
+		ZooKeeper.deleteAll();
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests that the recovery state is cleaned up after a JobManager stops.
+	 */
+	@Test
+	public void testJobManagerCleanUp() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+		// Configure the cluster
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Start the JobManager and TaskManager
+			flink.start(true);
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+			// Submit the job
+			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+			// Wait for the job to start
+			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+					jobManager, deadline.timeLeft());
+		}
+		finally {
+			flink.shutdown();
+		}
+
+		// Verify that everything is clean
+		verifyCleanRecoveryState(config);
+	}
+
+	/**
+	 * Tests that submissions to non-leaders are handled.
+	 */
+	@Test
+	public void testSubmitJobToNonLeader() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+		// Configure the cluster
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Start the JobManager and TaskManager
+			flink.start(true);
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			List<ActorRef> bothJobManagers = flink.getJobManagersAsJava();
+
+			ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+			ActorGateway nonLeadingJobManager;
+			if (bothJobManagers.get(0).equals(leadingJobManager.actor())) {
+				nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null);
+			}
+			else {
+				nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null);
+			}
+
+			// Submit the job
+			nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+			// Wait for the job to start. We are asking the *leading** JM here although we've
+			// submitted the job to the non-leading JM. This is the behaviour under test.
+			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+					leadingJobManager, deadline.timeLeft());
+
+			// Make sure that the **non-leading** JM has actually removed the job graph from her
+			// local state.
+			boolean success = false;
+			while (!success && deadline.hasTimeLeft()) {
+				JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus(
+						jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft());
+
+				if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) {
+					success = true;
+				}
+				else {
+					Thread.sleep(100);
+				}
+			}
+
+			if (!success) {
+				fail("Non-leading JM was still holding reference to the job graph.");
+			}
+		}
+		finally {
+			flink.shutdown();
+		}
+
+		// Verify that everything is clean
+		verifyCleanRecoveryState(config);
+	}
+
+	/**
+	 * Tests that clients receive updates after recovery by a new leader.
+	 */
+	@Test
+	public void testClientNonDetachedListeningBehaviour() throws Exception {
+		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+		// Test actor system
+		ActorSystem testSystem = null;
+
+		// JobManager setup. Start the job managers as separate processes in order to not run the
+		// actors postStop, which cleans up all running jobs.
+		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+
+		LeaderRetrievalService leaderRetrievalService = null;
+
+		ActorSystem taskManagerSystem = null;
+
+		try {
+			final Deadline deadline = TestTimeOut.fromNow();
+
+			// Test actor system
+			testSystem = AkkaUtils.createActorSystem(new Configuration(),
+					new Some<>(new Tuple2<String, Object>("localhost", 0)));
+
+			// The job managers
+			jobManagerProcess[0] = new JobManagerProcess(0, config);
+			jobManagerProcess[1] = new JobManagerProcess(1, config);
+
+			jobManagerProcess[0].createAndStart();
+			jobManagerProcess[1].createAndStart();
+
+			// Leader listener
+			TestingListener leaderListener = new TestingListener();
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService.start(leaderListener);
+
+			// The task manager
+			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+			TaskManager.startTaskManagerComponentsAndActor(
+					config, taskManagerSystem, "localhost",
+					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
+					false, StreamingMode.STREAMING, TaskManager.class);
+
+			// Client test actor
+			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
+					testSystem, Props.create(RecordingTestClient.class));
+
+			JobGraph jobGraph = createBlockingJobGraph();
+
+			{
+				// Initial submission
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				// The client
+				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
+
+				// Get the leader ref
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				// Submit the job in non-detached mode
+				leader.tell(new SubmitJob(jobGraph,
+						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+			}
+
+			// Who's the boss?
+			JobManagerProcess leadingJobManagerProcess;
+			if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
+				leadingJobManagerProcess = jobManagerProcess[0];
+			}
+			else {
+				leadingJobManagerProcess = jobManagerProcess[1];
+			}
+
+			// Kill the leading job manager process
+			leadingJobManagerProcess.destroy();
+
+			{
+				// Recovery by the standby JobManager
+				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+				String leaderAddress = leaderListener.getAddress();
+				UUID leaderId = leaderListener.getLeaderSessionID();
+
+				ActorRef leaderRef = AkkaUtils.getActorRef(
+						leaderAddress, testSystem, deadline.timeLeft());
+				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+				JobManagerActorTestUtils.waitForJobStatus(
+						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+
+				// Cancel the job
+				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+			}
+
+			// Wait for the execution result
+			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
+
+			int jobSubmitSuccessMessages = 0;
+			for (Object msg : clientRef.underlyingActor().getMessages()) {
+				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
+					jobSubmitSuccessMessages++;
+				}
+			}
+
+			// At least two submissions should be ack-ed (initial and recovery). This is quite
+			// conservative, but it is still possible that these messages are overtaken by the
+			// final message.
+			assertEquals(2, jobSubmitSuccessMessages);
+		}
+		catch (Throwable t) {
+			// In case of an error, print the job manager process logs.
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].printProcessLog();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].printProcessLog();
+			}
+
+			t.printStackTrace();
+		}
+		finally {
+			if (jobManagerProcess[0] != null) {
+				jobManagerProcess[0].destroy();
+			}
+
+			if (jobManagerProcess[1] != null) {
+				jobManagerProcess[1].destroy();
+			}
+
+			if (leaderRetrievalService != null) {
+				leaderRetrievalService.stop();
+			}
+
+			if (taskManagerSystem != null) {
+				taskManagerSystem.shutdown();
+			}
+
+			if (testSystem != null) {
+				testSystem.shutdown();
+			}
+		}
+	}
+
+	/**
+	 * Simple recording client.
+	 */
+	private static class RecordingTestClient extends UntypedActor {
+
+		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
+
+		private CountDownLatch jobResultLatch = new CountDownLatch(1);
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof LeaderSessionMessage) {
+				message = ((LeaderSessionMessage) message).message();
+			}
+
+			messages.add(message);
+
+			// Check for job result
+			if (message instanceof JobManagerMessages.JobResultFailure ||
+					message instanceof JobManagerMessages.JobResultSuccess) {
+
+				jobResultLatch.countDown();
+			}
+		}
+
+		public Queue<Object> getMessages() {
+			return messages;
+		}
+
+		public void awaitJobResult(long timeout) throws InterruptedException {
+			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a simple blocking JobGraph.
+	 */
+	private static JobGraph createBlockingJobGraph() {
+		JobGraph jobGraph = new JobGraph("Blocking program");
+
+		JobVertex jobVertex = new JobVertex("Blocking Vertex");
+		jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+
+		jobGraph.addVertex(jobVertex);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
+	 */
+	private static void verifyCleanRecoveryState(Configuration config) throws Exception {
+		// File state backend empty
+		Collection<File> stateHandles = FileUtils.listFiles(
+				FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+		if (!stateHandles.isEmpty()) {
+			fail("File state backend is not clean: " + stateHandles);
+		}
+
+		// ZooKeeper
+		String currentJobsPath = config.getString(
+				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+		if (stat.getCversion() == 0) {
+			// Sanity check: verify that some changes have been performed
+			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+					"this test. What are you testing?");
+		}
+
+		if (stat.getNumChildren() != 0) {
+			// Is everything clean again?
+			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
+					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
new file mode 100644
index 0000000..753e7be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StandaloneSubmittedJobGraphStoreTest {
+
+	/**
+	 * Tests that all operations work and don't change the state.
+	 */
+	@Test
+	public void testNoOps() throws Exception {
+		StandaloneSubmittedJobGraphStore jobGraphs = new StandaloneSubmittedJobGraphStore();
+
+		SubmittedJobGraph jobGraph = new SubmittedJobGraph(
+				new JobGraph("testNoOps"),
+				new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
+
+		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+		jobGraphs.putJobGraph(jobGraph);
+		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+		jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
+		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+		assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
new file mode 100644
index 0000000..861a713
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
+import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for basic {@link SubmittedJobGraphStore} contract.
+ */
+public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
+
+	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+	private final static LocalStateHandleProvider<SubmittedJobGraph> StateHandleProvider =
+			new LocalStateHandleProvider<>();
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (ZooKeeper != null) {
+			ZooKeeper.shutdown();
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZooKeeper.deleteAll();
+	}
+
+	@Test
+	public void testPutAndRemoveJobGraph() throws Exception {
+		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+				ZooKeeper.createClient(), "/testPutAndRemoveJobGraph",
+				StateHandleProvider);
+
+		try {
+			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+			jobGraphs.start(listener);
+
+			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+			// Empty state
+			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+			// Add initial
+			jobGraphs.putJobGraph(jobGraph);
+
+			// Verify initial job graph
+			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+			assertEquals(1, actual.size());
+
+			verifyJobGraphs(jobGraph, actual.get(0));
+
+			// Update (same ID)
+			jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
+			jobGraphs.putJobGraph(jobGraph);
+
+			// Verify updated
+			actual = jobGraphs.recoverJobGraphs();
+			assertEquals(1, actual.size());
+
+			verifyJobGraphs(jobGraph, actual.get(0));
+
+			// Remove
+			jobGraphs.removeJobGraph(jobGraph.getJobId());
+
+			// Empty state
+			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+			// Nothing should have been notified
+			verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
+			verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+			// Don't fail if called again
+			jobGraphs.removeJobGraph(jobGraph.getJobId());
+		}
+		finally {
+			jobGraphs.stop();
+		}
+	}
+
+	@Test
+	public void testRecoverJobGraphs() throws Exception {
+		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", StateHandleProvider);
+
+		try {
+			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+			jobGraphs.start(listener);
+
+			HashMap<JobID, SubmittedJobGraph> expected = new HashMap<>();
+			JobID[] jobIds = new JobID[] { new JobID(), new JobID(), new JobID() };
+
+			expected.put(jobIds[0], createSubmittedJobGraph(jobIds[0], 0));
+			expected.put(jobIds[1], createSubmittedJobGraph(jobIds[1], 1));
+			expected.put(jobIds[2], createSubmittedJobGraph(jobIds[2], 2));
+
+			// Add all
+			for (SubmittedJobGraph jobGraph : expected.values()) {
+				jobGraphs.putJobGraph(jobGraph);
+			}
+
+			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+
+			assertEquals(expected.size(), actual.size());
+
+			for (SubmittedJobGraph jobGraph : actual) {
+				assertTrue(expected.containsKey(jobGraph.getJobId()));
+
+				verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
+
+				jobGraphs.removeJobGraph(jobGraph.getJobId());
+			}
+
+			// Empty state
+			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+			// Nothing should have been notified
+			verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));
+			verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+		}
+		finally {
+			jobGraphs.stop();
+		}
+	}
+
+	@Test
+	public void testConcurrentAddJobGraph() throws Exception {
+		ZooKeeperSubmittedJobGraphStore jobGraphs = null;
+		ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
+
+		try {
+			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+
+			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+
+
+			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+			SubmittedJobGraph otherJobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+			final JobID[] actualOtherJobId = new JobID[1];
+			final CountDownLatch sync = new CountDownLatch(1);
+
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					actualOtherJobId[0] = (JobID) invocation.getArguments()[0];
+					sync.countDown();
+
+					return null;
+				}
+			}).when(listener).onAddedJobGraph(any(JobID.class));
+
+			// Test
+			jobGraphs.start(listener);
+			otherJobGraphs.start(null);
+
+			jobGraphs.putJobGraph(jobGraph);
+
+			// Everything is cool... not much happening ;)
+			verify(listener, never()).onAddedJobGraph(any(JobID.class));
+			verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+			// This bad boy adds the other job graph
+			otherJobGraphs.putJobGraph(otherJobGraph);
+
+			// Wait for the cache to call back
+			sync.await();
+
+			verify(listener, times(1)).onAddedJobGraph(any(JobID.class));
+			verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+			assertEquals(otherJobGraph.getJobId(), actualOtherJobId[0]);
+		}
+		finally {
+			if (jobGraphs != null) {
+				jobGraphs.stop();
+			}
+
+			if (otherJobGraphs != null) {
+				otherJobGraphs.stop();
+			}
+		}
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
+		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+
+		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+
+		jobGraphs.start(null);
+		otherJobGraphs.start(null);
+
+		SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+		jobGraphs.putJobGraph(jobGraph);
+
+		otherJobGraphs.putJobGraph(jobGraph);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) {
+		final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
+
+		final JobVertex jobVertex = new JobVertex("Test JobVertex");
+		jobVertex.setParallelism(1);
+
+		jobGraph.addVertex(jobVertex);
+
+		final JobInfo jobInfo = new JobInfo(
+				ActorRef.noSender(), ListeningBehaviour.DETACHED, start, Integer.MAX_VALUE);
+
+		return new SubmittedJobGraph(jobGraph, jobInfo);
+	}
+
+	protected void verifyJobGraphs(SubmittedJobGraph expected, SubmittedJobGraph actual)
+			throws Exception {
+
+		JobGraph expectedJobGraph = expected.getJobGraph();
+		JobGraph actualJobGraph = actual.getJobGraph();
+
+		assertEquals(expectedJobGraph.getName(), actualJobGraph.getName());
+		assertEquals(expectedJobGraph.getJobID(), actualJobGraph.getJobID());
+
+		JobInfo expectedJobInfo = expected.getJobInfo();
+		JobInfo actualJobInfo = actual.getJobInfo();
+
+		assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour());
+		assertEquals(expectedJobInfo.start(), actualJobInfo.start());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 753bbab..bbd8fad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,19 +25,25 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderElectionUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -151,8 +157,19 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	}
 
 	private Props createJobManagerProps(Configuration configuration) throws Exception {
-		LeaderElectionService leaderElectionService = LeaderElectionUtils.
-				createLeaderElectionService(configuration);
+		LeaderElectionService leaderElectionService;
+		if (RecoveryMode.fromConfig(configuration) == RecoveryMode.STANDALONE) {
+			leaderElectionService = new StandaloneLeaderElectionService();
+		}
+		else {
+			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client,
+					configuration);
+		}
+
+		// We don't need recovery in this test
+		SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
+		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 
 		return Props.create(
 				TestingJobManager.class,
@@ -166,7 +183,9 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 				1L,
 				AkkaUtils.getDefaultTimeout(),
 				StreamingMode.BATCH_ONLY,
-				leaderElectionService
+				leaderElectionService,
+				submittedJobGraphStore,
+				checkpointRecoveryFactory
 		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index c4fccd7..ea058f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -27,6 +27,8 @@ import java.util.UUID;
  */
 public class TestingLeaderElectionService implements LeaderElectionService, Serializable {
 
+	private static final long serialVersionUID = -8007939683948014574L;
+
 	private LeaderContender contender;
 	private boolean hasLeadership = false;
 
@@ -51,10 +53,12 @@ public class TestingLeaderElectionService implements LeaderElectionService, Seri
 	}
 
 	public void isLeader(UUID leaderSessionID) {
+		hasLeadership = true;
 		contender.grantLeadership(leaderSessionID);
 	}
 
 	public void notLeader() {
+		hasLeadership = false;
 		contender.revokeLeadership();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index bb60415..aae1840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderElectionUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -41,7 +42,7 @@ import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -92,7 +93,11 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		Thread thread;
 
+		CuratorFramework[] client = new CuratorFramework[2];
+
 		try {
+			client[0] = ZooKeeperUtils.startCuratorFramework(config);
+			client[1] = ZooKeeperUtils.startCuratorFramework(config);
 
 			InetSocketAddress wrongInetSocketAddress = new InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234);
 
@@ -116,7 +121,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 			String correctAddress = JobManager.getRemoteJobManagerAkkaURL(correctInetSocketAddress, Option.<String>empty());
 
-			faultyLeaderElectionService = LeaderElectionUtils.createLeaderElectionService(config);
+			faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
 			TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
 
 			faultyLeaderElectionService.start(wrongLeaderAddressContender);
@@ -127,7 +132,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 			thread.start();
 
-			leaderElectionService = LeaderElectionUtils.createLeaderElectionService(config);
+			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[1], config);
 			TestingContender correctLeaderAddressContender = new TestingContender(correctAddress, leaderElectionService);
 
 			Thread.sleep(sleepingTime);
@@ -155,6 +160,14 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			if (leaderElectionService != null) {
 				leaderElectionService.stop();
 			}
+
+			if (client[0] != null) {
+				client[0].close();
+			}
+
+			if (client[1] != null) {
+				client[1].close();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 68575e5..087e0fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -78,7 +78,7 @@ public class CheckpointMessagesTest {
 		assertNotNull(copy.toString());
 	}
 	
-	private static class MyHandle implements StateHandle<Serializable> {
+	public static class MyHandle implements StateHandle<Serializable> {
 
 		private static final long serialVersionUID = 8128146204128728332L;