You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/26 16:06:04 UTC

flink git commit: [FLINK-4684] [checkpoints] Remove redundant class loader from CheckpointCoordinator

Repository: flink
Updated Branches:
  refs/heads/master 8fa313c39 -> 70e71c161


[FLINK-4684] [checkpoints] Remove redundant class loader from CheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e71c16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e71c16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e71c16

Branch: refs/heads/master
Commit: 70e71c16177b40c2418e6a8ca0838bf117f6a926
Parents: 8fa313c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 26 12:32:10 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 18:05:01 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   5 -
 .../flink/runtime/checkpoint/TaskState.java     |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   1 -
 .../apache/flink/runtime/state/StateUtil.java   |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 101 +++++++------------
 .../checkpoint/CheckpointStateRestoreTest.java  |   7 +-
 6 files changed, 38 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fc40911..6a43ddf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -112,9 +112,6 @@ public class CheckpointCoordinator {
 	 * need to be ascending across job managers. */
 	private final CheckpointIDCounter checkpointIdCounter;
 
-	/** Class loader used to deserialize the state handles (as they may be user-defined) */
-	private final ClassLoader userClassLoader;
-
 	/** The base checkpoint interval. Actual trigger time may be affected by the
 	 * max concurrent checkpoints and minimum-pause values */
 	private final long baseInterval;
@@ -167,7 +164,6 @@ public class CheckpointCoordinator {
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
-			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			SavepointStore savepointStore,
@@ -198,7 +194,6 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.savepointStore = checkNotNull(savepointStore);
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
-		this.userClassLoader = checkNotNull(userClassLoader);
 		this.statsTracker = checkNotNull(statsTracker);
 
 		this.timer = new Timer("Checkpoint Timer", true);

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 657dd60..f5e3618 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -62,7 +62,7 @@ public class TaskState implements StateObject {
 				"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
 
 		this.jobVertexID = jobVertexID;
-		//preallocate lists of the required size, so that we can randomly set values to indexes
+
 		this.subtaskStates = new HashMap<>(parallelism);
 		this.keyGroupsStateHandles = new HashMap<>(parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c3cf297..7c3fa0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -390,7 +390,6 @@ public class ExecutionGraph {
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,
-				userClassLoader,
 				checkpointIDCounter,
 				checkpointStore,
 				savepointStore,

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 3c5157e..aa28404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -69,7 +69,7 @@ public class StateUtil {
 	 * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
 	 *
 	 * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
-	 * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+	 * @throws IOException exception that is a collection of all suppressed exceptions that were caught during iteration
 	 */
 	public static void bestEffortCloseAllStateObjects(
 			Iterable<? extends StateObject> handlesToDiscard) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index bc61742..9adaa86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -42,10 +42,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.junit.Assert;
 import org.junit.Test;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
@@ -109,7 +112,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -162,7 +164,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -213,7 +214,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -265,7 +265,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -390,7 +389,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -511,7 +509,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -662,7 +659,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
@@ -798,7 +794,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(10, cl),
 					new HeapSavepointStore(),
@@ -920,7 +915,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
@@ -989,7 +983,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
@@ -1069,7 +1062,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
@@ -1161,7 +1153,6 @@ public class CheckpointCoordinatorTest {
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
@@ -1246,7 +1237,6 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
-				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
 				new HeapSavepointStore(),
@@ -1384,7 +1374,6 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
-				cl,
 				counter,
 				new StandaloneCompletedCheckpointStore(10, cl),
 				new HeapSavepointStore(),
@@ -1470,8 +1459,9 @@ public class CheckpointCoordinatorTest {
 					maxConcurrentAttempts,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
-					(), new StandaloneCompletedCheckpointStore(2, cl),
+					new ExecutionVertex[] { commitVertex }, 
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
 					new DisabledCheckpointStatsTracker());
 
@@ -1541,8 +1531,9 @@ public class CheckpointCoordinatorTest {
 					maxConcurrentAttempts, // max two concurrent checkpoints
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
-					(), new StandaloneCompletedCheckpointStore(2, cl),
+					new ExecutionVertex[] { commitVertex }, 
+					new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
 					new DisabledCheckpointStatsTracker());
 
@@ -1621,7 +1612,8 @@ public class CheckpointCoordinatorTest {
 					2, // max two concurrent checkpoints
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
+					new ExecutionVertex[] { commitVertex },
+					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(2, cl),
 					new HeapSavepointStore(),
 					new DisabledCheckpointStatsTracker());
@@ -1672,7 +1664,6 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
-				cl,
 				checkpointIDCounter,
 				new StandaloneCompletedCheckpointStore(2, cl),
 				new HeapSavepointStore(),
@@ -1723,7 +1714,6 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
-				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
 				new HeapSavepointStore(),
@@ -1772,7 +1762,8 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+		ExecutionVertex[] arrayExecutionVertices = 
+				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1784,7 +1775,6 @@ public class CheckpointCoordinatorTest {
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
-			cl,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1, cl),
 			new HeapSavepointStore(),
@@ -1796,8 +1786,8 @@ public class CheckpointCoordinatorTest {
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-		List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-		List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> nonPartitionedState = generateStateForVertex(jobVertexID1, index);
@@ -1876,7 +1866,7 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+		ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1888,7 +1878,6 @@ public class CheckpointCoordinatorTest {
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
-			cl,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1, cl),
 			new HeapSavepointStore(),
@@ -1900,8 +1889,8 @@ public class CheckpointCoordinatorTest {
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-		List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-		List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -1991,7 +1980,8 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+		ExecutionVertex[] arrayExecutionVertices = 
+				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2003,7 +1993,6 @@ public class CheckpointCoordinatorTest {
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
-			cl,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1, cl),
 			new HeapSavepointStore(),
@@ -2015,8 +2004,10 @@ public class CheckpointCoordinatorTest {
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-		List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-		List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+		List<KeyGroupRange> keyGroupPartitions1 = 
+				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+		List<KeyGroupRange> keyGroupPartitions2 = 
+				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -2110,7 +2101,8 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+		ExecutionVertex[] arrayExecutionVertices = 
+				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2122,7 +2114,6 @@ public class CheckpointCoordinatorTest {
 				arrayExecutionVertices,
 				arrayExecutionVertices,
 				arrayExecutionVertices,
-				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
 				new HeapSavepointStore(),
@@ -2134,8 +2125,10 @@ public class CheckpointCoordinatorTest {
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-		List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-		List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+		List<KeyGroupRange> keyGroupPartitions1 = 
+				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+		List<KeyGroupRange> keyGroupPartitions2 = 
+				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -2173,7 +2166,8 @@ public class CheckpointCoordinatorTest {
 
 		int newParallelism2 = 13;
 
-		List<KeyGroupRange> newKeyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+		List<KeyGroupRange> newKeyGroupPartitions2 = 
+				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, newParallelism2);
 
 		final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
 				jobVertexID1,
@@ -2207,37 +2201,12 @@ public class CheckpointCoordinatorTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	static void sendAckMessageToCoordinator(
-			CheckpointCoordinator coord,
-			long checkpointId, JobID jid,
-			ExecutionJobVertex jobVertex,
-			JobVertexID jobVertexID,
-			List<KeyGroupRange> keyGroupPartitions) throws Exception {
-
-		for (int index = 0; index < jobVertex.getParallelism(); index++) {
-			ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID, index);
-			List<KeyGroupsStateHandle> keyGroupState = generateKeyGroupState(
-					jobVertexID,
-					keyGroupPartitions.get(index));
-
-			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					state,
-					keyGroupState);
-
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
-		}
-	}
-
 	public static List<KeyGroupsStateHandle> generateKeyGroupState(
 			JobVertexID jobVertexID,
 			KeyGroupRange keyGroupPartition) throws IOException {
 
-		KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupPartition);
 		List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
-		int runningGroupsOffset = 0;
+
 		// generate state for one keygroup
 		for (int keyGroupIndex : keyGroupPartition) {
 			Random random = new Random(jobVertexID.hashCode() + keyGroupIndex);
@@ -2270,8 +2239,7 @@ public class CheckpointCoordinatorTest {
 		//write all generated values in a single byte array, which is index by groupOffsetsInFinalByteArray
 		byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset];
 		runningGroupsOffset = 0;
-		byte[] old = null;
-		for(byte[] serializedGroupValue : serializedGroupValues) {
+		for (byte[] serializedGroupValue : serializedGroupValues) {
 			System.arraycopy(
 					serializedGroupValue,
 					0,
@@ -2279,7 +2247,6 @@ public class CheckpointCoordinatorTest {
 					runningGroupsOffset,
 					serializedGroupValue.length);
 			runningGroupsOffset += serializedGroupValue.length;
-			old = serializedGroupValue;
 		}
 
 		ByteStreamStateHandle allSerializedStatesHandle = new ByteStreamStateHandle(

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 5416292..a4896aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -32,12 +32,11 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.runtime.util.SerializableObject;
+
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -101,7 +100,6 @@ public class CheckpointStateRestoreTest {
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -182,7 +180,6 @@ public class CheckpointStateRestoreTest {
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
-					cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),
@@ -231,7 +228,7 @@ public class CheckpointStateRestoreTest {
 					Integer.MAX_VALUE,
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
-					new ExecutionVertex[0], cl,
+					new ExecutionVertex[0],
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl),
 					new HeapSavepointStore(),