You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 16:46:52 UTC

[4/4] flink git commit: [FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly cleaned up and also
the triggering of subsequent checkpoints will be started.

This closes #2872.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c42d258
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c42d258
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c42d258

Branch: refs/heads/master
Commit: 0c42d258e9d9d30eeeee7d0f1487ef0ac8b90fa4
Parents: add3765
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 24 18:16:28 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 16:05:43 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 179 ++++++++++++-------
 .../runtime/checkpoint/CheckpointException.java |  35 ++++
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  65 +++----
 .../CheckpointCoordinatorFailureTest.java       | 141 +++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   2 +-
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |   1 -
 8 files changed, 318 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5f0fd74..3ce7a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -609,25 +609,18 @@ public class CheckpointCoordinator {
 	 *
 	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
 	 */
-	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
+	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
 			return false;
 		}
 
 		if (!job.equals(message.getJob())) {
-			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 			return false;
 		}
 
 		final long checkpointId = message.getCheckpointId();
 
-		CompletedCheckpoint completed = null;
-		PendingCheckpoint checkpoint;
-
-		// Flag indicating whether the ack message was for a known pending
-		// checkpoint.
-		boolean isPendingCheckpoint;
-
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -635,10 +628,9 @@ public class CheckpointCoordinator {
 				return false;
 			}
 
-			checkpoint = pendingCheckpoints.get(checkpointId);
+			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				isPendingCheckpoint = true;
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
 					case SUCCESS:
@@ -646,37 +638,7 @@ public class CheckpointCoordinator {
 							checkpointId, message.getTaskExecutionId(), message.getJob());
 
 						if (checkpoint.isFullyAcknowledged()) {
-
-							// record the time when this was completed, to calculate
-							// the 'min delay between checkpoints'
-							lastCheckpointCompletionNanos = System.nanoTime();
-
-							// complete the checkpoint structure
-							completed = checkpoint.finalizeCheckpoint();
-							completedCheckpointStore.addCheckpoint(completed);
-
-							LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
-								completed.getStateSize(), completed.getDuration());
-
-							if (LOG.isDebugEnabled()) {
-								StringBuilder builder = new StringBuilder();
-								builder.append("Checkpoint state: ");
-								for (TaskState state : completed.getTaskStates().values()) {
-									builder.append(state);
-									builder.append(", ");
-								}
-								// Remove last two chars ", "
-								builder.delete(builder.length() - 2, builder.length());
-
-								LOG.debug(builder.toString());
-							}
-
-							pendingCheckpoints.remove(checkpointId);
-							rememberRecentCheckpointId(checkpointId);
-
-							dropSubsumedCheckpoints(completed.getCheckpointID());
-
-							triggerQueuedRequests();
+							completePendingCheckpoint(checkpoint);
 						}
 						break;
 					case DUPLICATE:
@@ -700,6 +662,8 @@ public class CheckpointCoordinator {
 
 						discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
 				}
+
+				return true;
 			}
 			else if (checkpoint != null) {
 				// this should not happen
@@ -707,39 +671,106 @@ public class CheckpointCoordinator {
 						"Received message for discarded but non-removed checkpoint " + checkpointId);
 			}
 			else {
+				boolean wasPendingCheckpoint;
+
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
-					isPendingCheckpoint = true;
+					wasPendingCheckpoint = true;
 					LOG.warn("Received late message for now expired checkpoint attempt {} from " +
 						"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
 				}
 				else {
 					LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
 						checkpointId, message.getTaskExecutionId(), message.getJob());
-					isPendingCheckpoint = false;
+					wasPendingCheckpoint = false;
 				}
 
 				// try to discard the state so that we don't have lingering state lying around
 				discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
+
+				return wasPendingCheckpoint;
 			}
 		}
+	}
 
-		// send the confirmation messages to the necessary targets. we do this here
-		// to be outside the lock scope
-		if (completed != null) {
-			final long timestamp = completed.getTimestamp();
+	/**
+	 * Try to complete the given pending checkpoint.
+	 *
+	 * Important: This method should only be called in the checkpoint lock scope.
+	 *
+	 * @param pendingCheckpoint to complete
+	 * @throws CheckpointException if the completion failed
+	 */
+	private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
+		final long checkpointId = pendingCheckpoint.getCheckpointId();
+		CompletedCheckpoint completedCheckpoint = null;
 
-			for (ExecutionVertex ev : tasksToCommitTo) {
-				Execution ee = ev.getCurrentExecutionAttempt();
-				if (ee != null) {
-					ee.notifyCheckpointComplete(checkpointId, timestamp);
-				}
+		try {
+			completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+
+			completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+			rememberRecentCheckpointId(checkpointId);
+			dropSubsumedCheckpoints(checkpointId);
+		} catch (Exception exception) {
+			// abort the current pending checkpoint if it has not been discarded yet
+			if (!pendingCheckpoint.isDiscarded()) {
+				pendingCheckpoint.abortError(exception);
 			}
 
-			statsTracker.onCompletedCheckpoint(completed);
+			if (completedCheckpoint != null) {
+				// we failed to store the completed checkpoint. Let's clean up
+				final CompletedCheckpoint cc = completedCheckpoint;
+
+				executor.execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							cc.discard();
+						} catch (Exception nestedException) {
+							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+						}
+					}
+				});
+			}
+
+			throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+		} finally {
+			pendingCheckpoints.remove(checkpointId);
+
+			triggerQueuedRequests();
 		}
+		
+		// record the time when this was completed, to calculate
+		// the 'min delay between checkpoints'
+		lastCheckpointCompletionNanos = System.nanoTime();
+
+		LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
-		return isPendingCheckpoint;
+		if (LOG.isDebugEnabled()) {
+			StringBuilder builder = new StringBuilder();
+			builder.append("Checkpoint state: ");
+			for (TaskState state : completedCheckpoint.getTaskStates().values()) {
+				builder.append(state);
+				builder.append(", ");
+			}
+			// Remove last two chars ", "
+			builder.delete(builder.length() - 2, builder.length());
+
+			LOG.debug(builder.toString());
+		}
+
+		final long timestamp = completedCheckpoint.getTimestamp();
+
+		for (ExecutionVertex ev : tasksToCommitTo) {
+			Execution ee = ev.getCurrentExecutionAttempt();
+			if (ee != null) {
+				ee.notifyCheckpointComplete(checkpointId, timestamp);
+			}
+		}
+
+		statsTracker.onCompletedCheckpoint(completedCheckpoint);
 	}
 
 	private void rememberRecentCheckpointId(long id) {
@@ -958,22 +989,34 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	/**
+	 * Discards the given state object asynchronously belonging to the given job, execution attempt
+	 * id and checkpoint id.
+	 *
+	 * @param jobId identifying the job to which the state object belongs
+	 * @param executionAttemptID identifying the task to which the state object belongs
+	 * @param checkpointId of the state object
+	 * @param stateObject to discard asynchronously
+	 */
 	private void discardState(
-		final JobID jobId,
-		final ExecutionAttemptID executionAttemptID,
-		final long checkpointId,
-		final StateObject stateObject) {
-		executor.execute(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					stateObject.discardState();
-				} catch (Exception e) {
+			final JobID jobId,
+			final ExecutionAttemptID executionAttemptID,
+			final long checkpointId,
+			final StateObject stateObject) {
+		
+		if (stateObject != null) {
+			executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						stateObject.discardState();
+					} catch (Throwable throwable) {
 					LOG.warn("Could not properly discard state object of checkpoint {} " +
 						"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId,
-						e);
+						throwable);
+					}
 				}
-			}
-		});
+			});
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
new file mode 100644
index 0000000..707878c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Base class for checkpoint related exceptions.
+ */
+public class CheckpointException extends Exception {
+
+	private static final long serialVersionUID = -4341865597039002540L;
+
+	public CheckpointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CheckpointException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index ed65011..0e70b1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -151,7 +151,7 @@ public class CompletedCheckpoint implements Serializable {
 		}
 	}
 
-	private void discard() throws Exception {
+	void discard() throws Exception {
 		try {
 			if (externalPath != null) {
 				SavepointStore.removeSavepoint(externalPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index cfb59f6..e7df5bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,6 +35,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -198,47 +199,37 @@ public class PendingCheckpoint {
 		return onCompletionPromise;
 	}
 
-	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+	public CompletedCheckpoint finalizeCheckpoint() {
 		synchronized (lock) {
-			try {
-				if (discarded) {
-					throw new IllegalStateException("pending checkpoint is discarded");
+			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
+			// Persist if required
+			String externalPath = null;
+			if (props.externalizeCheckpoint()) {
+				try {
+					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+					externalPath = SavepointStore.storeSavepoint(
+							targetDirectory,
+							savepoint);
+				} catch (IOException e) {
+					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
 				}
-				if (notYetAcknowledgedTasks.isEmpty()) {
-					// Persist if required
-					String externalPath = null;
-					if (props.externalizeCheckpoint()) {
-						try {
-							Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
-							externalPath = SavepointStore.storeSavepoint(
-									targetDirectory,
-									savepoint);
-						} catch (Throwable t) {
-							LOG.error("Failed to persist checkpoints " + checkpointId + ".", t);
-						}
-					}
+			}
 
-					CompletedCheckpoint completed = new CompletedCheckpoint(
-							jobId,
-							checkpointId,
-							checkpointTimestamp,
-							System.currentTimeMillis(),
-							new HashMap<>(taskStates),
-							props,
-							externalPath);
+			CompletedCheckpoint completed = new CompletedCheckpoint(
+					jobId,
+					checkpointId,
+					checkpointTimestamp,
+					System.currentTimeMillis(),
+					new HashMap<>(taskStates),
+					props,
+					externalPath);
 
-					onCompletionPromise.complete(completed);
+			onCompletionPromise.complete(completed);
 
-					dispose(false);
+			dispose(false);
 
-					return completed;
-				} else {
-					throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
-				}
-			} catch (Throwable t) {
-				onCompletionPromise.completeExceptionally(t);
-				throw t;
-			}
+			return completed;
 		}
 	}
 
@@ -378,9 +369,8 @@ public class PendingCheckpoint {
 	private void dispose(boolean releaseState) {
 		synchronized (lock) {
 			try {
-				discarded = true;
 				numAcknowledgedTasks = -1;
-				if (releaseState) {
+				if (!discarded && releaseState) {
 					executor.execute(new Runnable() {
 						@Override
 						public void run() {
@@ -395,6 +385,7 @@ public class PendingCheckpoint {
 
 				}
 			} finally {
+				discarded = true;
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
new file mode 100644
index 0000000..26db012
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PendingCheckpoint.class)
+public class CheckpointCoordinatorFailureTest extends TestLogger {
+
+	/**
+	 * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
+	 * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
+	 */
+	@Test
+	public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+		JobID jid = new JobID();
+
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+		final long triggerTimestamp = 1L;
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new StandaloneCheckpointIDCounter(),
+			new FailingCompletedCheckpointStore(),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
+
+		coord.triggerCheckpoint(triggerTimestamp, false);
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next();
+
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp);
+		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData);
+
+		CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class);
+		PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
+				"store the completed checkpoint.");
+		} catch (CheckpointException e) {
+			// ignore because we expected this exception
+		}
+
+		// make sure that the pending checkpoint has been discarded after we could not complete it
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		verify(completedCheckpoint).discard();
+	}
+
+	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+		@Override
+		public void recover() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+			throw new Exception("The failing completed checkpoint store failed again... :-(");
+		}
+
+		@Override
+		public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void shutdown(JobStatus jobStatus) throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public int getNumberOfRetainedCheckpoints() {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 73eebcf..463c2ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2589,7 +2589,7 @@ public class CheckpointCoordinatorTest {
 		return executionJobVertex;
 	}
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+	static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(
 			attemptID,
 			new JobVertexID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 6b0d3f8..9f4bdba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -260,7 +260,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			}
 		}
 
-		private void discard() {
+		void discard() {
 			if (!isDiscarded) {
 				this.isDiscarded = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index e918965..be7b033 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -158,7 +158,6 @@ public class PendingCheckpointTest {
 		} catch (IllegalStateException ignored) {
 			// Expected
 		}
-		assertTrue(future.isDone());
 	}
 
 	/**