You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:09 UTC

[flink] 01/06: [hotfix] Use camel format to replace abbreviations for the variable names.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dc019eaff9a30fcd04eedde436811d605ca040b
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:59:28 2020 +0800

    [hotfix] Use camel format to replace abbreviations for the variable names.
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 590 ++++++++++-----------
 1 file changed, 295 insertions(+), 295 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eb96a1b..60cd9ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -169,22 +169,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		try {
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -195,22 +195,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreFinished() {
 		try {
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -221,22 +221,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
 		try {
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -246,7 +246,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Test
 	public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -259,24 +259,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
 
 		// set up the coordinator
-		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2, checkpointFailureManager);
 
 		try {
 			// trigger the checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkPointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkPointFuture.isCompletedExceptionally());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// decline checkpoint from the other task
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 			fail("Test failed.");
 		}
@@ -286,7 +286,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertEquals(errorMsg, e.getMessage());
 		} finally {
 			try {
-				coord.shutdown(JobStatus.FINISHED);
+				checkpointCoordinator.shutdown(JobStatus.FINISHED);
 			} catch (Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
@@ -302,13 +302,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
-		CheckpointCoordinator coord = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
 
 		try {
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			coord.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
+			checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
 
 			fail("Test failed.");
 		}
@@ -317,7 +317,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertTrue(e instanceof RuntimeException);
 			assertEquals(errorMsg, e.getMessage());
 		} finally {
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -329,7 +329,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndDeclineCheckpointSimple() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -338,29 +338,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// we have one task scheduled that will cancel after timeout
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			assertNotNull(checkpoint);
 			assertEquals(checkpointId, checkpoint.getCheckpointId());
-			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(jobId, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint.getOperatorStates().size());
@@ -372,36 +372,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, checkpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
 
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// decline checkpoint from the other task, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint.isDiscarded());
 
 			// the canceler is also removed
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// validate that we have no new pending checkpoint
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint.isDiscarded());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -417,7 +417,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndDeclineCheckpointComplex() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -425,36 +425,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
 			// trigger second checkpoint, should also succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(2, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
+			Iterator<Map.Entry<Long, PendingCheckpoint>> it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
 			long checkpoint1Id = it.next().getKey();
 			long checkpoint2Id = it.next().getKey();
-			PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
-			PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
+			PendingCheckpoint checkpoint1 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
+			PendingCheckpoint checkpoint2 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
 
 			assertNotNull(checkpoint1);
 			assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
-			assertEquals(jid, checkpoint1.getJobId());
+			assertEquals(jobId, checkpoint1.getJobId());
 			assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint1.getOperatorStates().size());
@@ -463,7 +463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			assertNotNull(checkpoint2);
 			assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
-			assertEquals(jid, checkpoint2.getJobId());
+			assertEquals(jobId, checkpoint2.getJobId());
 			assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint2.getOperatorStates().size());
@@ -483,25 +483,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// validate that it is the same second checkpoint from earlier
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+			long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpointNew = checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
 			assertEquals(checkpoint2Id, checkpointIdNew);
 
 			assertNotNull(checkpointNew);
 			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
-			assertEquals(jid, checkpointNew.getJobId());
+			assertEquals(jobId, checkpointNew.getJobId());
 			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpointNew.getOperatorStates().size());
@@ -511,15 +511,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint1.isDiscarded());
 
 			// will not notify abort message again
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -530,7 +530,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndConfirmSimpleCheckpoint() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -539,28 +539,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			assertNotNull(checkpoint);
 			assertEquals(checkpointId, checkpoint.getCheckpointId());
-			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(jobId, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint.getOperatorStates().size());
@@ -583,8 +583,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
 
 			// acknowledge from one of the tasks
-			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
@@ -592,21 +592,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(taskOperatorSubtaskStates2, never()).registerSharedStates(any(SharedStateRegistry.class));
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 			verify(subtaskState2, never()).registerSharedStates(any(SharedStateRegistry.class));
 
 			// acknowledge the other task.
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
 			assertTrue(checkpoint.isDiscarded());
 
 			// the now we should have a completed checkpoint
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			// the canceler should be removed now
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
@@ -623,27 +623,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
 			}
 
-			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
-			assertEquals(jid, success.getJobId());
+			CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+			assertEquals(jobId, success.getJobId());
 			assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
 			assertEquals(2, success.getOperatorStates().size());
 
 			// ---------------
 			// trigger another checkpoint and see that this one replaces the other checkpoint
 			// ---------------
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+			long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
-			assertEquals(jid, successNew.getJobId());
+			CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+			assertEquals(jobId, successNew.getJobId());
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
 			assertTrue(successNew.getOperatorStates().isEmpty());
 
@@ -656,7 +656,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
 			}
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -667,7 +667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testMultipleConcurrentCheckpoints() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 
@@ -690,9 +690,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -701,18 +701,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
@@ -720,20 +720,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), any(Long.class), any(CheckpointOptions.class));
 
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			PendingCheckpoint pending2;
 			{
-				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
 				PendingCheckpoint cc1 = all.next();
 				PendingCheckpoint cc2 = all.next();
 				pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -746,44 +746,44 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the first checkpoint should be confirmed
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertTrue(pending1.isDiscarded());
 
 			// the first confirm message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), any(Long.class));
 
 			// send the last remaining ack for the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the second checkpoint should be confirmed
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertTrue(pending2.isDiscarded());
 
 			// the second commit message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
 
 			// validate the committed checkpoints
-			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 
 			CompletedCheckpoint sc1 = scs.get(0);
 			assertEquals(checkpointId1, sc1.getCheckpointID());
-			assertEquals(jid, sc1.getJobId());
+			assertEquals(jobId, sc1.getJobId());
 			assertTrue(sc1.getOperatorStates().isEmpty());
 
 			CompletedCheckpoint sc2 = scs.get(1);
 			assertEquals(checkpointId2, sc2.getCheckpointID());
-			assertEquals(jid, sc2.getJobId());
+			assertEquals(jobId, sc2.getJobId());
 			assertTrue(sc2.getOperatorStates().isEmpty());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -794,7 +794,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testSuccessfulCheckpointSubsumesUnsuccessful() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
@@ -816,9 +816,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -827,18 +827,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
@@ -861,21 +861,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
 
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
-				coord.triggerCheckpoint(false);
+				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			PendingCheckpoint pending2;
 			{
-				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
 				PendingCheckpoint cc1 = all.next();
 				PendingCheckpoint cc2 = all.next();
 				pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -901,13 +901,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the second checkpoint should be confirmed, and the first discarded
 			// actually both pending checkpoints are discarded, and the second has been transformed
@@ -915,8 +915,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertTrue(pending1.isDiscarded());
 			assertTrue(pending2.isDiscarded());
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// validate that all received subtask states in the first checkpoint have been discarded
 			verify(subtaskState11, times(1)).discardState();
@@ -928,20 +928,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(subtaskState23, never()).discardState();
 
 			// validate the committed checkpoints
-			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 			CompletedCheckpoint success = scs.get(0);
 			assertEquals(checkpointId2, success.getCheckpointID());
-			assertEquals(jid, success.getJobId());
+			assertEquals(jobId, success.getJobId());
 			assertEquals(3, success.getOperatorStates().size());
 
 			// the first confirm message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
 			verify(subtaskState13, times(1)).discardState();
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 
 			// validate that the states in the second checkpoint have been discarded
 			verify(subtaskState21, times(1)).discardState();
@@ -958,7 +958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointTimeoutIsolated() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 
@@ -977,9 +977,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
 					.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -988,12 +988,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.build();
 
 			// trigger a checkpoint, partially acknowledged
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
 
 			OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
@@ -1002,13 +1002,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			OperatorSubtaskState subtaskState1 = mock(OperatorSubtaskState.class);
 			taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 			// triggers cancelling
 			manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
 			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// validate that the received states have been discarded
 			verify(subtaskState1, times(1)).discardState();
@@ -1016,7 +1016,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// no confirm message must have been sent
 			verify(commitVertex.getCurrentExecutionAttempt(), times(0)).notifyCheckpointComplete(anyLong(), anyLong());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1027,7 +1027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testHandleMessagesForNonExistingCheckpoints() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 
@@ -1041,9 +1041,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
 					.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -1051,26 +1051,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
-			long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
 			// send some messages that do not belong to either the job or the any
 			// of the vertices that need to be acknowledged.
 			// non of the messages should throw an exception
 
 			// wrong job id
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 			// unknown checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
 
 			// unknown ack vertex
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1103,7 +1103,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new CheckpointCoordinatorConfigurationBuilder()
 				.setMaxConcurrentCheckpoints(1)
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1113,13 +1113,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
-		final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture.isCompletedExceptionally());
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+		PendingCheckpoint pendingCheckpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 
 		long checkpointId = pendingCheckpoint.getCheckpointId();
 
@@ -1130,7 +1130,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
 
 		// acknowledge the first trigger vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
 
 		// verify that the subtask state has not been discarded
 		verify(subtaskStateTrigger, never()).discardState();
@@ -1138,7 +1138,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot unknownSubtaskState = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message for an unknown vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should discard acknowledge messages from an unknown vertex belonging to our job
 		verify(unknownSubtaskState, times(1)).discardState();
@@ -1146,21 +1146,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot differentJobSubtaskState = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message from an unknown job
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should not interfere with different jobs
 		verify(differentJobSubtaskState, never()).discardState();
 
 		// duplicate acknowledge message for the trigger vertex
 		TaskStateSnapshot triggerSubtaskState = mock(TaskStateSnapshot.class);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
 		verify(triggerSubtaskState, never()).discardState();
 
 		// let the checkpoint fail at the first ack vertex
 		reset(subtaskStateTrigger);
-		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 		assertTrue(pendingCheckpoint.isDiscarded());
 
@@ -1170,14 +1170,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot ackSubtaskState = mock(TaskStateSnapshot.class);
 
 		// late acknowledge message from the second ack vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// check that we also cleaned up this state
 		verify(ackSubtaskState, times(1)).discardState();
 
 		// receive an acknowledge message from an unknown job
 		reset(differentJobSubtaskState);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should not interfere with different jobs
 		verify(differentJobSubtaskState, never()).discardState();
@@ -1185,7 +1185,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot unknownSubtaskState2 = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message for an unknown vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
 
 		// we should discard acknowledge messages from an unknown vertex belonging to our job
 		verify(unknownSubtaskState2, times(1)).discardState();
@@ -1208,7 +1208,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Test
 	public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1217,26 +1217,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 		// trigger the first checkpoint. this should succeed
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
-		CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(savepointFuture.isDone());
 
 		// validate that we have a pending savepoint
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId);
+		long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 		assertNotNull(pending);
 		assertEquals(checkpointId, pending.getCheckpointId());
-		assertEquals(jid, pending.getJobId());
+		assertEquals(jobId, pending.getJobId());
 		assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
 		assertEquals(0, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(0, pending.getOperatorStates().size());
@@ -1254,8 +1254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
 
 		// acknowledge from one of the tasks
-		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
-		coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
 		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
@@ -1263,13 +1263,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the same task again (should not matter)
-		coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.areTasksFullyAcknowledged());
 		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the other task.
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
@@ -1277,8 +1277,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertNotNull(savepointFuture.get());
 
 		// the now we should have a completed checkpoint
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// validate that the relevant tasks got a confirmation message
 		{
@@ -1292,27 +1292,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
 		}
 
-		CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
-		assertEquals(jid, success.getJobId());
+		CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, success.getJobId());
 		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
 		assertEquals(2, success.getOperatorStates().size());
 
 		// ---------------
 		// trigger another checkpoint and see that this one replaces the other checkpoint
 		// ---------------
-		savepointFuture = coord.triggerSavepoint(savepointDir);
+		savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(savepointFuture.isDone());
 
-		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+		long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-		CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
-		assertEquals(jid, successNew.getJobId());
+		CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, successNew.getJobId());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
 		assertTrue(successNew.getOperatorStates().isEmpty());
 		assertNotNull(savepointFuture.get());
@@ -1330,7 +1330,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
 		}
 
-		coord.shutdown(JobStatus.FINISHED);
+		checkpointCoordinator.shutdown(JobStatus.FINISHED);
 	}
 
 	/**
@@ -1341,7 +1341,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	 */
 	@Test
 	public void testSavepointsAreNotSubsumed() throws Exception {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1352,9 +1352,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
-				.setJobId(jid)
+				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 				.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
 				.setCheckpointIDCounter(counter)
@@ -1365,67 +1365,67 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
 		// Trigger savepoint and checkpoint
-		CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir);
 
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId1 = counter.getLast();
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 		assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture2.isCompletedExceptionally());
 		long checkpointId2 = counter.getLast();
-		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+		assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
 		assertFalse(savepointFuture1.isDone());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture3.isCompletedExceptionally());
-		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId2 = counter.getLast();
 		assertFalse(savepointFuture2.isCompletedExceptionally());
-		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
-		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
 		assertFalse(savepointFuture1.isDone());
 		assertNotNull(savepointFuture2.get());
 
 		// Ack first savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 		assertNotNull(savepointFuture1.get());
 	}
 
 	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1457,9 +1457,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0L) // no extra delay
 					.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1468,7 +1468,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			for (int i = 0; i < maxConcurrentAttempts; i++) {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
@@ -1481,7 +1481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
 
 			final Collection<ScheduledFuture<?>> periodicScheduledTasks =
 				manuallyTriggeredScheduledExecutor.getPeriodicScheduledTask();
@@ -1498,7 +1498,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1510,7 +1510,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testMaxConcurrentAttempsWithSubsumption() {
 		try {
 			final int maxConcurrentAttempts = 2;
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1528,9 +1528,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0L) // no extra delay
 					.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1539,37 +1539,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			do {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 				manuallyTriggeredScheduledExecutor.triggerAll();
 			}
-			while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+			while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
 			// validate that the pending checkpoints are there
-			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
-			assertNotNull(coord.getPendingCheckpoints().get(1L));
-			assertNotNull(coord.getPendingCheckpoints().get(2L));
+			assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
 
 			// now we acknowledge the second checkpoint, which should subsume the first checkpoint
 			// and allow two more checkpoints to be triggered
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
 
 			// after a while, there should be the new checkpoints
 			do {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 				manuallyTriggeredScheduledExecutor.triggerAll();
 			}
-			while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+			while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
 			// do the final check
-			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
-			assertNotNull(coord.getPendingCheckpoints().get(3L));
-			assertNotNull(coord.getPendingCheckpoints().get(4L));
+			assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1580,7 +1580,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testPeriodicSchedulingWithInactiveTasks() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1601,9 +1601,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0) // no extra delay
 					.setMaxConcurrentCheckpoints(2) // max two concurrent checkpoints
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1612,12 +1612,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			// no checkpoint should have started so far
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			// now move the state to RUNNING
 			currentState.set(ExecutionState.RUNNING);
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+			assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1651,7 +1651,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new CheckpointCoordinatorConfigurationBuilder()
 				.setMaxConcurrentCheckpoints(1) // max one checkpoint at a time => should not affect savepoints
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1667,7 +1667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		// Trigger savepoints
 		for (int i = 0; i < numSavepoints; i++) {
-			savepointFutures.add(coord.triggerSavepoint(savepointDir));
+			savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir));
 		}
 
 		// After triggering multiple savepoints, all should in progress
@@ -1680,7 +1680,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// ACK all savepoints
 		long checkpointId = checkpointIDCounter.getLast();
 		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 		}
 
 		// After ACKs, all should be completed
@@ -1699,7 +1699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				.setMinPauseBetweenCheckpoints(100000000L) // very long min delay => should not affect savepoints
 				.setMaxConcurrentCheckpoints(1)
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCheckpointCoordinatorConfiguration(chkConfig)
 				.setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2))
@@ -1708,10 +1708,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
-		CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-		CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint1.isDone());
 	}
 
@@ -1727,18 +1727,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new CheckpointCoordinatorConfigurationBuilder()
 					.setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
 			CompletableFuture<CompletedCheckpoint> checkpointFuture =
-				coord.triggerCheckpoint(false);
+				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
-			for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
+			for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
 				CheckpointProperties expected = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
@@ -1746,7 +1746,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 
 			// the now we should have a completed checkpoint
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1948,20 +1948,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		coord.setCheckpointStatsTracker(tracker);
+		checkpointCoordinator.setCheckpointStatsTracker(tracker);
 
 		when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
 			.thenReturn(mock(PendingCheckpointStats.class));
 
 		// Trigger a checkpoint and verify callback
 		CompletableFuture<CompletedCheckpoint> checkpointFuture =
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture.isCompletedExceptionally());
 
@@ -1977,7 +1977,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCompletedCheckpointStore(store)
 				.setTimer(manuallyTriggeredScheduledExecutor)
@@ -1994,9 +1994,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new TestCompletedCheckpointStorageLocation()));
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		coord.setCheckpointStatsTracker(tracker);
+		checkpointCoordinator.setCheckpointStatsTracker(tracker);
 
-		assertTrue(coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
+		assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
 
 		verify(tracker, times(1))
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
@@ -2005,7 +2005,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testSharedStateRegistrationOnRestore() throws Exception {
 
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		final JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2029,9 +2029,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
-				.setJobId(jid)
+				.setJobId(jobId)
 				.setTasks(arrayExecutionVertices)
 				.setCompletedCheckpointStore(store)
 				.setTimer(manuallyTriggeredScheduledExecutor)
@@ -2049,10 +2049,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 
 		for (int i = 0; i < numCheckpoints; ++i) {
-			performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, i);
+			performIncrementalCheckpoint(jobId, checkpointCoordinator, jobVertex1, keyGroupPartitions1, i);
 		}
 
-		List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
+		List<CompletedCheckpoint> completedCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
 		assertEquals(numCheckpoints, completedCheckpoints.size());
 
 		int sharedHandleCount = 0;
@@ -2112,7 +2112,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// restore the store
 		Set<ExecutionJobVertex> tasks = new HashSet<>();
 		tasks.add(jobVertex1);
-		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+		assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		cp = 0;
@@ -2225,18 +2225,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testTriggerCheckpointAfterCancel() throws Exception {
 		// set up the coordinator
 		TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCheckpointIDCounter(idCounter)
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
-		idCounter.setOwner(coord);
+		idCounter.setOwner(checkpointCoordinator);
 
 		try {
 			// start the coordinator
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 			final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
-				coord.triggerCheckpoint(
+				checkpointCoordinator.triggerCheckpoint(
 					CheckpointProperties
 						.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 					null,
@@ -2254,7 +2254,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					checkpointExceptionOptional.get().getCheckpointFailureReason());
 			}
 		} finally {
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -2387,18 +2387,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	private void performIncrementalCheckpoint(
-		JobID jid,
-		CheckpointCoordinator coord,
+		JobID jobId,
+		CheckpointCoordinator checkpointCoordinator,
 		ExecutionJobVertex jobVertex1,
 		List<KeyGroupRange> keyGroupPartitions1,
 		int cpSequenceNumber) throws Exception {
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(false);
+		checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 
-		assertEquals(1, coord.getPendingCheckpoints().size());
-		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+		long checkpointId = Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 
@@ -2445,13 +2445,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-				jid,
+				jobId,
 				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
 				checkpointId,
 				new CheckpointMetrics(),
 				taskStateSnapshot);
 
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
 		}
 	}