You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/09/08 21:39:12 UTC

[flink] branch master updated (597f502 -> 99cd44f)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 597f502  [FLINK-14942][docs] remove the "shallow copy" note in "Modifying savepoints" section, fix example
     new d850871  [hotfix] Remove unused import in CheckpointCoordinatorTestingUtils
     new 44797b7  [hotfix] Use camel format to replace abbreviations for the variable names.
     new 9c73ce4  [hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i
     new 8c0bcca  [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource.
     new 4bb8f59  [FLINK-18641][runtime/checkpointing] Checkpoint the operator coordinators before triggering the master hooks.
     new 977454e  [hotfix] Make it more clear that the master hooks are also fired in the checkpoint timer thread.
     new 99cd44f  [hotfix] Add unit test for checkpoint failure.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  |  45 +-
 .../flink/runtime/concurrent/FutureUtils.java      |  12 +
 .../checkpoint/CheckpointCoordinatorTest.java      | 885 +++++++++++++--------
 .../CheckpointCoordinatorTestingUtils.java         | 112 ++-
 4 files changed, 733 insertions(+), 321 deletions(-)


[flink] 06/07: [hotfix] Make it more clear that the master hooks are also fired in the checkpoint timer thread.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 977454e126be3c44a4c8c9327c0846bf25255d52
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Aug 17 13:58:24 2020 +0800

    [hotfix] Make it more clear that the master hooks are also fired in the checkpoint timer thread.
---
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 420a62f..26d577d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -538,14 +538,14 @@ public class CheckpointCoordinator {
 			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
 			// ExternallyInducedSource is used.
 			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
-				.thenCompose(ignored -> {
+				.thenComposeAsync(ignored -> {
 					// If the code reaches here, the pending checkpoint is guaranteed to be not null.
 					// We use FutureUtils.getWithoutException() to make compiler happy with checked
 					// exceptions in the signature.
 					PendingCheckpoint checkpoint =
 						FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
 					return snapshotMasterState(checkpoint);
-				});
+				}, timer);
 
 			FutureUtils.assertNoException(
 				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)


[flink] 01/07: [hotfix] Remove unused import in CheckpointCoordinatorTestingUtils

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d850871518ed3cc7e4af4d6e16848d01eb8ac4c6
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Sep 8 17:51:29 2020 +0200

    [hotfix] Remove unused import in CheckpointCoordinatorTestingUtils
---
 .../flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java      | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index f3c4057..9f2c7bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;


[flink] 05/07: [FLINK-18641][runtime/checkpointing] Checkpoint the operator coordinators before triggering the master hooks.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4bb8f59e737f0ce210b6d2d03be863759c352cb1
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Aug 17 13:55:25 2020 +0800

    [FLINK-18641][runtime/checkpointing] Checkpoint the operator coordinators before triggering the master hooks.
    
    We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
    This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
    ExternallyInducedSource is used.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  16 ++-
 .../checkpoint/CheckpointCoordinatorTest.java      |  34 +++++--
 .../CheckpointCoordinatorTestingUtils.java         | 111 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 501322e..420a62f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -528,15 +528,25 @@ public class CheckpointCoordinator {
 							request.getOnCompletionFuture()),
 						timer);
 
-			final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
-					.thenCompose(this::snapshotMasterState);
-
 			final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
 					.thenComposeAsync((pendingCheckpoint) ->
 							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
+			// We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
+			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
+			// ExternallyInducedSource is used.
+			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
+				.thenCompose(ignored -> {
+					// If the code reaches here, the pending checkpoint is guaranteed to be not null.
+					// We use FutureUtils.getWithoutException() to make compiler happy with checked
+					// exceptions in the signature.
+					PendingCheckpoint checkpoint =
+						FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+					return snapshotMasterState(checkpoint);
+				});
+
 			FutureUtils.assertNoException(
 				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
 					.handleAsync(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eef540c..16d7136 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -86,6 +86,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -2309,10 +2310,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	/**
 	 * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the
-	 * master hooks and finished before the master checkpoint.
+	 * master hooks and finished before the master checkpoint. Also make sure that the operator
+	 * coordinators are checkpointed before starting the task checkpoint.
 	 */
 	@Test
-	public void testTaskCheckpointTriggeredByMasterHooks() throws Exception {
+	public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
 		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
@@ -2323,10 +2325,6 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
 			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
 
-		// set up the coordinator and validate the initial state
-		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
-		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
-
 		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
 		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
 		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
@@ -2336,6 +2334,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
 		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
 
+		// Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
+		AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
+		OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
+			new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+				.setOnCallingCheckpointCoordinator((checkpointId, result) -> {
+					coordCheckpointDone.set(true);
+					result.complete(new byte[0]);
+				})
+				.setOperatorID(opID1)
+				.build();
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
+			.setJobId(jobId)
+			.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
+			.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+			.setTimer(manuallyTriggeredScheduledExecutor)
+			.setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
+			.build();
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
 		// Add a master hook which triggers and acks the task checkpoint immediately.
 		// In this case the task checkpoints would complete before the job master checkpoint completes.
 		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
@@ -2347,6 +2366,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			@Override
 			@Nullable
 			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
 				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
 				// the master state snapshot completes.
 				checkpointIdRef.set(checkpointId);
@@ -2393,7 +2413,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// trigger the first checkpoint. this should succeed
 		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		// now we should have a completed checkpoint
 		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 9f2c7bc..2dc46eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -75,7 +75,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -654,8 +657,10 @@ public class CheckpointCoordinatorTestingUtils {
 			return this;
 		}
 
-		public void setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
+		public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(
+				Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
 			this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
+			return this;
 		}
 
 		public CheckpointCoordinatorBuilder setCheckpointIDCounter(
@@ -697,6 +702,11 @@ public class CheckpointCoordinatorTestingUtils {
 			return this;
 		}
 
+		public CheckpointCoordinatorBuilder setStateBackEnd(StateBackend stateBackEnd) {
+			this.checkpointStateBackend = stateBackEnd;
+			return this;
+		}
+
 		public CheckpointCoordinator build() {
 			return new CheckpointCoordinator(
 				jobId,
@@ -740,4 +750,103 @@ public class CheckpointCoordinatorTestingUtils {
 			return new String(serialized, StandardCharsets.UTF_8);
 		}
 	}
+
+	// ----------------- Mock class builders ---------------
+
+	public static final class MockOperatorCheckpointCoordinatorContextBuilder {
+		private BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator = null;
+		private Consumer<Long> onCallingAfterSourceBarrierInjection = null;
+		private OperatorID operatorID = null;
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingCheckpointCoordinator(
+				BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator) {
+			this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
+			return this;
+		}
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingAfterSourceBarrierInjection(
+				Consumer<Long> onCallingAfterSourceBarrierInjection) {
+			this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
+			return this;
+		}
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOperatorID(OperatorID operatorID) {
+			this.operatorID = operatorID;
+			return this;
+		}
+
+		public MockOperatorCoordinatorCheckpointContext build() {
+			return new MockOperatorCoordinatorCheckpointContext(
+				onCallingCheckpointCoordinator,
+				onCallingAfterSourceBarrierInjection,
+				operatorID);
+		}
+	}
+
+	// ----------------- Mock classes --------------------
+
+	/**
+	 * The class works together with {@link MockOperatorCheckpointCoordinatorContextBuilder} to
+	 * construct a mock OperatorCoordinatorCheckpointContext.
+	 */
+	public static final class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
+		private final BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator;
+		private final Consumer<Long> onCallingAfterSourceBarrierInjection;
+		private final OperatorID operatorID;
+
+		private MockOperatorCoordinatorCheckpointContext(
+				BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator,
+				Consumer<Long> onCallingAfterSourceBarrierInjection,
+				OperatorID operatorID) {
+			 this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
+			 this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
+			 this.operatorID = operatorID;
+		}
+
+		@Override
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			if (onCallingCheckpointCoordinator != null) {
+				onCallingCheckpointCoordinator.accept(checkpointId, result);
+			}
+		}
+
+		@Override
+		public void afterSourceBarrierInjection(long checkpointId) {
+			if (onCallingAfterSourceBarrierInjection != null) {
+				onCallingAfterSourceBarrierInjection.accept(checkpointId);
+			}
+		}
+
+		@Override
+		public void abortCurrentTriggering() {
+
+		}
+
+		@Override
+		public void checkpointComplete(long checkpointId) {
+
+		}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+
+		}
+
+		@Override
+		public OperatorID operatorId() {
+			return operatorID;
+		}
+
+		@Override
+		public int maxParallelism() {
+			return 1;
+		}
+
+		@Override
+		public int currentParallelism() {
+			return 1;
+		}
+	}
+
+
 }


[flink] 02/07: [hotfix] Use camel format to replace abbreviations for the variable names.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44797b7fef5b226e1b494e4c8590a1052466c07e
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:59:28 2020 +0800

    [hotfix] Use camel format to replace abbreviations for the variable names.
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 590 ++++++++++-----------
 1 file changed, 295 insertions(+), 295 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eb96a1b..60cd9ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -169,22 +169,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		try {
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -195,22 +195,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreFinished() {
 		try {
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -221,22 +221,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
 		try {
-			CheckpointCoordinator coord = getCheckpointCoordinator();
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
 			// nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertTrue(checkpointFuture.isCompletedExceptionally());
 
 			// still, nothing should be happening
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -246,7 +246,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Test
 	public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -259,24 +259,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
 
 		// set up the coordinator
-		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2, checkpointFailureManager);
 
 		try {
 			// trigger the checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkPointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkPointFuture.isCompletedExceptionally());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// decline checkpoint from the other task
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 			fail("Test failed.");
 		}
@@ -286,7 +286,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertEquals(errorMsg, e.getMessage());
 		} finally {
 			try {
-				coord.shutdown(JobStatus.FINISHED);
+				checkpointCoordinator.shutdown(JobStatus.FINISHED);
 			} catch (Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
@@ -302,13 +302,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
-		CheckpointCoordinator coord = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
 
 		try {
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			coord.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
+			checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
 
 			fail("Test failed.");
 		}
@@ -317,7 +317,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertTrue(e instanceof RuntimeException);
 			assertEquals(errorMsg, e.getMessage());
 		} finally {
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -329,7 +329,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndDeclineCheckpointSimple() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -338,29 +338,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// we have one task scheduled that will cancel after timeout
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			assertNotNull(checkpoint);
 			assertEquals(checkpointId, checkpoint.getCheckpointId());
-			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(jobId, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint.getOperatorStates().size());
@@ -372,36 +372,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, checkpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
 
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 
 			// decline checkpoint from the other task, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint.isDiscarded());
 
 			// the canceler is also removed
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// validate that we have no new pending checkpoint
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint.isDiscarded());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -417,7 +417,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndDeclineCheckpointComplex() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -425,36 +425,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
 			// trigger second checkpoint, should also succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(2, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
+			Iterator<Map.Entry<Long, PendingCheckpoint>> it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
 			long checkpoint1Id = it.next().getKey();
 			long checkpoint2Id = it.next().getKey();
-			PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
-			PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
+			PendingCheckpoint checkpoint1 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
+			PendingCheckpoint checkpoint2 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
 
 			assertNotNull(checkpoint1);
 			assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
-			assertEquals(jid, checkpoint1.getJobId());
+			assertEquals(jobId, checkpoint1.getJobId());
 			assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint1.getOperatorStates().size());
@@ -463,7 +463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			assertNotNull(checkpoint2);
 			assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
-			assertEquals(jid, checkpoint2.getJobId());
+			assertEquals(jobId, checkpoint2.getJobId());
 			assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint2.getOperatorStates().size());
@@ -483,25 +483,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// validate that it is the same second checkpoint from earlier
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+			long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpointNew = checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
 			assertEquals(checkpoint2Id, checkpointIdNew);
 
 			assertNotNull(checkpointNew);
 			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
-			assertEquals(jid, checkpointNew.getJobId());
+			assertEquals(jobId, checkpointNew.getJobId());
 			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpointNew.getOperatorStates().size());
@@ -511,15 +511,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint1.isDiscarded());
 
 			// will not notify abort message again
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -530,7 +530,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testTriggerAndConfirmSimpleCheckpoint() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock Execution vertices that receive the checkpoint trigger messages
 			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -539,28 +539,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+			CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
 			// validate that we have a pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 			assertNotNull(checkpoint);
 			assertEquals(checkpointId, checkpoint.getCheckpointId());
-			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(jobId, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(0, checkpoint.getOperatorStates().size());
@@ -583,8 +583,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
 
 			// acknowledge from one of the tasks
-			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
@@ -592,21 +592,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(taskOperatorSubtaskStates2, never()).registerSharedStates(any(SharedStateRegistry.class));
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.areTasksFullyAcknowledged());
 			verify(subtaskState2, never()).registerSharedStates(any(SharedStateRegistry.class));
 
 			// acknowledge the other task.
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
 			assertTrue(checkpoint.isDiscarded());
 
 			// the now we should have a completed checkpoint
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			// the canceler should be removed now
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
@@ -623,27 +623,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
 			}
 
-			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
-			assertEquals(jid, success.getJobId());
+			CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+			assertEquals(jobId, success.getJobId());
 			assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
 			assertEquals(2, success.getOperatorStates().size());
 
 			// ---------------
 			// trigger another checkpoint and see that this one replaces the other checkpoint
 			// ---------------
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+			long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
 
-			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
-			assertEquals(jid, successNew.getJobId());
+			CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+			assertEquals(jobId, successNew.getJobId());
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
 			assertTrue(successNew.getOperatorStates().isEmpty());
 
@@ -656,7 +656,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
 			}
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -667,7 +667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testMultipleConcurrentCheckpoints() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 
@@ -690,9 +690,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -701,18 +701,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
@@ -720,20 +720,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), any(Long.class), any(CheckpointOptions.class));
 
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			PendingCheckpoint pending2;
 			{
-				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
 				PendingCheckpoint cc1 = all.next();
 				PendingCheckpoint cc2 = all.next();
 				pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -746,44 +746,44 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the first checkpoint should be confirmed
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertTrue(pending1.isDiscarded());
 
 			// the first confirm message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), any(Long.class));
 
 			// send the last remaining ack for the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the second checkpoint should be confirmed
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 			assertTrue(pending2.isDiscarded());
 
 			// the second commit message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
 
 			// validate the committed checkpoints
-			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 
 			CompletedCheckpoint sc1 = scs.get(0);
 			assertEquals(checkpointId1, sc1.getCheckpointID());
-			assertEquals(jid, sc1.getJobId());
+			assertEquals(jobId, sc1.getJobId());
 			assertTrue(sc1.getOperatorStates().isEmpty());
 
 			CompletedCheckpoint sc2 = scs.get(1);
 			assertEquals(checkpointId2, sc2.getCheckpointID());
-			assertEquals(jid, sc2.getJobId());
+			assertEquals(jobId, sc2.getJobId());
 			assertTrue(sc2.getOperatorStates().isEmpty());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -794,7 +794,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testSuccessfulCheckpointSubsumesUnsuccessful() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
@@ -816,9 +816,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -827,18 +827,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
@@ -861,21 +861,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
 
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
-				coord.triggerCheckpoint(false);
+				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture2.isCompletedExceptionally());
 
-			assertEquals(2, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			PendingCheckpoint pending2;
 			{
-				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
 				PendingCheckpoint cc1 = all.next();
 				PendingCheckpoint cc2 = all.next();
 				pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -901,13 +901,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
 
 			// now, the second checkpoint should be confirmed, and the first discarded
 			// actually both pending checkpoints are discarded, and the second has been transformed
@@ -915,8 +915,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertTrue(pending1.isDiscarded());
 			assertTrue(pending2.isDiscarded());
 
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// validate that all received subtask states in the first checkpoint have been discarded
 			verify(subtaskState11, times(1)).discardState();
@@ -928,20 +928,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(subtaskState23, never()).discardState();
 
 			// validate the committed checkpoints
-			List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 			CompletedCheckpoint success = scs.get(0);
 			assertEquals(checkpointId2, success.getCheckpointID());
-			assertEquals(jid, success.getJobId());
+			assertEquals(jobId, success.getJobId());
 			assertEquals(3, success.getOperatorStates().size());
 
 			// the first confirm message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
 			verify(subtaskState13, times(1)).discardState();
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 
 			// validate that the states in the second checkpoint have been discarded
 			verify(subtaskState21, times(1)).discardState();
@@ -958,7 +958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testCheckpointTimeoutIsolated() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices
 
@@ -977,9 +977,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			// set up the coordinator
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
 					.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -988,12 +988,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.build();
 
 			// trigger a checkpoint, partially acknowledged
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
 
 			OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
@@ -1002,13 +1002,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			OperatorSubtaskState subtaskState1 = mock(OperatorSubtaskState.class);
 			taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 			// triggers cancelling
 			manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
 			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// validate that the received states have been discarded
 			verify(subtaskState1, times(1)).discardState();
@@ -1016,7 +1016,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// no confirm message must have been sent
 			verify(commitVertex.getCurrentExecutionAttempt(), times(0)).notifyCheckpointComplete(anyLong(), anyLong());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1027,7 +1027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testHandleMessagesForNonExistingCheckpoints() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 
@@ -1041,9 +1041,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
 					.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -1051,26 +1051,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
-			long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+			long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
 			// send some messages that do not belong to either the job or the any
 			// of the vertices that need to be acknowledged.
 			// non of the messages should throw an exception
 
 			// wrong job id
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 			// unknown checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
 
 			// unknown ack vertex
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1103,7 +1103,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new CheckpointCoordinatorConfigurationBuilder()
 				.setMaxConcurrentCheckpoints(1)
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1113,13 +1113,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
-		final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture.isCompletedExceptionally());
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+		PendingCheckpoint pendingCheckpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 
 		long checkpointId = pendingCheckpoint.getCheckpointId();
 
@@ -1130,7 +1130,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
 
 		// acknowledge the first trigger vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
 
 		// verify that the subtask state has not been discarded
 		verify(subtaskStateTrigger, never()).discardState();
@@ -1138,7 +1138,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot unknownSubtaskState = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message for an unknown vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should discard acknowledge messages from an unknown vertex belonging to our job
 		verify(unknownSubtaskState, times(1)).discardState();
@@ -1146,21 +1146,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot differentJobSubtaskState = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message from an unknown job
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should not interfere with different jobs
 		verify(differentJobSubtaskState, never()).discardState();
 
 		// duplicate acknowledge message for the trigger vertex
 		TaskStateSnapshot triggerSubtaskState = mock(TaskStateSnapshot.class);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
 		verify(triggerSubtaskState, never()).discardState();
 
 		// let the checkpoint fail at the first ack vertex
 		reset(subtaskStateTrigger);
-		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
 		assertTrue(pendingCheckpoint.isDiscarded());
 
@@ -1170,14 +1170,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot ackSubtaskState = mock(TaskStateSnapshot.class);
 
 		// late acknowledge message from the second ack vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// check that we also cleaned up this state
 		verify(ackSubtaskState, times(1)).discardState();
 
 		// receive an acknowledge message from an unknown job
 		reset(differentJobSubtaskState);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
 		// we should not interfere with different jobs
 		verify(differentJobSubtaskState, never()).discardState();
@@ -1185,7 +1185,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		TaskStateSnapshot unknownSubtaskState2 = mock(TaskStateSnapshot.class);
 
 		// receive an acknowledge message for an unknown vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
 
 		// we should discard acknowledge messages from an unknown vertex belonging to our job
 		verify(unknownSubtaskState2, times(1)).discardState();
@@ -1208,7 +1208,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Test
 	public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1217,26 +1217,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
 		// trigger the first checkpoint. this should succeed
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
-		CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(savepointFuture.isDone());
 
 		// validate that we have a pending savepoint
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId);
+		long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
 		assertNotNull(pending);
 		assertEquals(checkpointId, pending.getCheckpointId());
-		assertEquals(jid, pending.getJobId());
+		assertEquals(jobId, pending.getJobId());
 		assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
 		assertEquals(0, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(0, pending.getOperatorStates().size());
@@ -1254,8 +1254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
 
 		// acknowledge from one of the tasks
-		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
-		coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
 		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
@@ -1263,13 +1263,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the same task again (should not matter)
-		coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.areTasksFullyAcknowledged());
 		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the other task.
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
@@ -1277,8 +1277,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertNotNull(savepointFuture.get());
 
 		// the now we should have a completed checkpoint
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// validate that the relevant tasks got a confirmation message
 		{
@@ -1292,27 +1292,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
 		}
 
-		CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
-		assertEquals(jid, success.getJobId());
+		CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, success.getJobId());
 		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
 		assertEquals(2, success.getOperatorStates().size());
 
 		// ---------------
 		// trigger another checkpoint and see that this one replaces the other checkpoint
 		// ---------------
-		savepointFuture = coord.triggerSavepoint(savepointDir);
+		savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(savepointFuture.isDone());
 
-		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+		long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-		CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
-		assertEquals(jid, successNew.getJobId());
+		CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, successNew.getJobId());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
 		assertTrue(successNew.getOperatorStates().isEmpty());
 		assertNotNull(savepointFuture.get());
@@ -1330,7 +1330,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
 		}
 
-		coord.shutdown(JobStatus.FINISHED);
+		checkpointCoordinator.shutdown(JobStatus.FINISHED);
 	}
 
 	/**
@@ -1341,7 +1341,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	 */
 	@Test
 	public void testSavepointsAreNotSubsumed() throws Exception {
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1352,9 +1352,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
-				.setJobId(jid)
+				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 				.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
 				.setCheckpointIDCounter(counter)
@@ -1365,67 +1365,67 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
 		// Trigger savepoint and checkpoint
-		CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir);
 
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId1 = counter.getLast();
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 		assertFalse(checkpointFuture1.isCompletedExceptionally());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture2.isCompletedExceptionally());
 		long checkpointId2 = counter.getLast();
-		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+		assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
 		assertFalse(savepointFuture1.isDone());
 
-		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = coord.triggerCheckpoint(false);
+		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture3.isCompletedExceptionally());
-		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-		CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId2 = counter.getLast();
 		assertFalse(savepointFuture2.isCompletedExceptionally());
-		assertEquals(3, coord.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
-		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
 		assertFalse(savepointFuture1.isDone());
 		assertNotNull(savepointFuture2.get());
 
 		// Ack first savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
+		checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(3, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 		assertNotNull(savepointFuture1.get());
 	}
 
 	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1457,9 +1457,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0L) // no extra delay
 					.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1468,7 +1468,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			for (int i = 0; i < maxConcurrentAttempts; i++) {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
@@ -1481,7 +1481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
 
 			final Collection<ScheduledFuture<?>> periodicScheduledTasks =
 				manuallyTriggeredScheduledExecutor.getPeriodicScheduledTask();
@@ -1498,7 +1498,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1510,7 +1510,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testMaxConcurrentAttempsWithSubsumption() {
 		try {
 			final int maxConcurrentAttempts = 2;
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1528,9 +1528,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0L) // no extra delay
 					.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1539,37 +1539,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			do {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 				manuallyTriggeredScheduledExecutor.triggerAll();
 			}
-			while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+			while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
 			// validate that the pending checkpoints are there
-			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
-			assertNotNull(coord.getPendingCheckpoints().get(1L));
-			assertNotNull(coord.getPendingCheckpoints().get(2L));
+			assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
 
 			// now we acknowledge the second checkpoint, which should subsume the first checkpoint
 			// and allow two more checkpoints to be triggered
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
 
 			// after a while, there should be the new checkpoints
 			do {
 				manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 				manuallyTriggeredScheduledExecutor.triggerAll();
 			}
-			while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+			while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
 			// do the final check
-			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
-			assertNotNull(coord.getPendingCheckpoints().get(3L));
-			assertNotNull(coord.getPendingCheckpoints().get(4L));
+			assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
+			assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
 
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1580,7 +1580,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testPeriodicSchedulingWithInactiveTasks() {
 		try {
-			final JobID jid = new JobID();
+			final JobID jobId = new JobID();
 
 			// create some mock execution vertices and trigger some checkpoint
 			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1601,9 +1601,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setMinPauseBetweenCheckpoints(0) // no extra delay
 					.setMaxConcurrentCheckpoints(2) // max two concurrent checkpoints
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
-					.setJobId(jid)
+					.setJobId(jobId)
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
 					.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1612,12 +1612,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 
 			manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			// no checkpoint should have started so far
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			// now move the state to RUNNING
 			currentState.set(ExecutionState.RUNNING);
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
 			manuallyTriggeredScheduledExecutor.triggerAll();
 
-			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+			assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1651,7 +1651,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new CheckpointCoordinatorConfigurationBuilder()
 				.setMaxConcurrentCheckpoints(1) // max one checkpoint at a time => should not affect savepoints
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setJobId(jobId)
 				.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1667,7 +1667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		// Trigger savepoints
 		for (int i = 0; i < numSavepoints; i++) {
-			savepointFutures.add(coord.triggerSavepoint(savepointDir));
+			savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir));
 		}
 
 		// After triggering multiple savepoints, all should in progress
@@ -1680,7 +1680,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// ACK all savepoints
 		long checkpointId = checkpointIDCounter.getLast();
 		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 		}
 
 		// After ACKs, all should be completed
@@ -1699,7 +1699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				.setMinPauseBetweenCheckpoints(100000000L) // very long min delay => should not affect savepoints
 				.setMaxConcurrentCheckpoints(1)
 				.build();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCheckpointCoordinatorConfiguration(chkConfig)
 				.setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2))
@@ -1708,10 +1708,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
-		CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-		CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint1.isDone());
 	}
 
@@ -1727,18 +1727,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new CheckpointCoordinatorConfigurationBuilder()
 					.setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)
 					.build();
-			CheckpointCoordinator coord =
+			CheckpointCoordinator checkpointCoordinator =
 				new CheckpointCoordinatorBuilder()
 					.setCheckpointCoordinatorConfiguration(chkConfig)
 					.setTimer(manuallyTriggeredScheduledExecutor)
 					.build();
 
 			CompletableFuture<CompletedCheckpoint> checkpointFuture =
-				coord.triggerCheckpoint(false);
+				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
 			assertFalse(checkpointFuture.isCompletedExceptionally());
 
-			for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
+			for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
 				CheckpointProperties expected = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
@@ -1746,7 +1746,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 
 			// the now we should have a completed checkpoint
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -1948,20 +1948,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		coord.setCheckpointStatsTracker(tracker);
+		checkpointCoordinator.setCheckpointStatsTracker(tracker);
 
 		when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
 			.thenReturn(mock(PendingCheckpointStats.class));
 
 		// Trigger a checkpoint and verify callback
 		CompletableFuture<CompletedCheckpoint> checkpointFuture =
-			coord.triggerCheckpoint(false);
+			checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertFalse(checkpointFuture.isCompletedExceptionally());
 
@@ -1977,7 +1977,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCompletedCheckpointStore(store)
 				.setTimer(manuallyTriggeredScheduledExecutor)
@@ -1994,9 +1994,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new TestCompletedCheckpointStorageLocation()));
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		coord.setCheckpointStatsTracker(tracker);
+		checkpointCoordinator.setCheckpointStatsTracker(tracker);
 
-		assertTrue(coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
+		assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
 
 		verify(tracker, times(1))
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
@@ -2005,7 +2005,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	@Test
 	public void testSharedStateRegistrationOnRestore() throws Exception {
 
-		final JobID jid = new JobID();
+		final JobID jobId = new JobID();
 
 		final JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2029,9 +2029,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
-				.setJobId(jid)
+				.setJobId(jobId)
 				.setTasks(arrayExecutionVertices)
 				.setCompletedCheckpointStore(store)
 				.setTimer(manuallyTriggeredScheduledExecutor)
@@ -2049,10 +2049,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 
 		for (int i = 0; i < numCheckpoints; ++i) {
-			performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, i);
+			performIncrementalCheckpoint(jobId, checkpointCoordinator, jobVertex1, keyGroupPartitions1, i);
 		}
 
-		List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
+		List<CompletedCheckpoint> completedCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
 		assertEquals(numCheckpoints, completedCheckpoints.size());
 
 		int sharedHandleCount = 0;
@@ -2112,7 +2112,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// restore the store
 		Set<ExecutionJobVertex> tasks = new HashSet<>();
 		tasks.add(jobVertex1);
-		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+		assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		cp = 0;
@@ -2225,18 +2225,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	public void testTriggerCheckpointAfterCancel() throws Exception {
 		// set up the coordinator
 		TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
-		CheckpointCoordinator coord =
+		CheckpointCoordinator checkpointCoordinator =
 			new CheckpointCoordinatorBuilder()
 				.setCheckpointIDCounter(idCounter)
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
-		idCounter.setOwner(coord);
+		idCounter.setOwner(checkpointCoordinator);
 
 		try {
 			// start the coordinator
-			coord.startCheckpointScheduler();
+			checkpointCoordinator.startCheckpointScheduler();
 			final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
-				coord.triggerCheckpoint(
+				checkpointCoordinator.triggerCheckpoint(
 					CheckpointProperties
 						.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 					null,
@@ -2254,7 +2254,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					checkpointExceptionOptional.get().getCheckpointFailureReason());
 			}
 		} finally {
-			coord.shutdown(JobStatus.FINISHED);
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
 		}
 	}
 
@@ -2387,18 +2387,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	private void performIncrementalCheckpoint(
-		JobID jid,
-		CheckpointCoordinator coord,
+		JobID jobId,
+		CheckpointCoordinator checkpointCoordinator,
 		ExecutionJobVertex jobVertex1,
 		List<KeyGroupRange> keyGroupPartitions1,
 		int cpSequenceNumber) throws Exception {
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(false);
+		checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 
-		assertEquals(1, coord.getPendingCheckpoints().size());
-		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+		long checkpointId = Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 
@@ -2445,13 +2445,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-				jid,
+				jobId,
 				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
 				checkpointId,
 				new CheckpointMetrics(),
 				taskStateSnapshot);
 
-			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
 		}
 	}
 


[flink] 03/07: [hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c73ce4a4a9d1065a2a9a7f124db3636b9be8444
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 23:14:07 2020 +0800

    [hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i
---
 .../flink/runtime/concurrent/FutureUtils.java      | 12 +++++++
 .../checkpoint/CheckpointCoordinatorTest.java      | 37 +++++++++++-----------
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 74f44e3..7125b6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -1150,6 +1150,18 @@ public class FutureUtils {
 			executor);
 	}
 
+	/**
+	 * Throws the causing exception if the given future is completed exceptionally, otherwise do nothing.
+	 *
+	 * @param future the future to check.
+	 * @throws Exception when the future is completed exceptionally.
+	 */
+	public static void throwIfCompletedExceptionally(CompletableFuture<?> future) throws Exception {
+		if (future.isCompletedExceptionally()) {
+			future.get();
+		}
+	}
+
 	private static <T> BiConsumer<T, Throwable> forwardTo(CompletableFuture<T> target) {
 		return (value, throwable) -> {
 			if (throwable != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 60cd9ce..3207164 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -265,7 +266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkPointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkPointFuture);
 
 			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
@@ -346,7 +347,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -434,12 +435,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			// trigger second checkpoint, should also succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			// validate that we have a pending checkpoint
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -548,7 +549,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -707,7 +708,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -726,7 +727,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -833,7 +834,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -868,7 +869,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
 				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -990,7 +991,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger a checkpoint, partially acknowledged
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1053,7 +1054,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
@@ -1115,7 +1116,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
@@ -1374,11 +1375,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-		assertFalse(checkpointFuture1.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture2.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 		long checkpointId2 = counter.getLast();
 		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
@@ -1394,13 +1395,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture3.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
 		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId2 = counter.getLast();
-		assertFalse(savepointFuture2.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
 		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
@@ -1736,7 +1737,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			CompletableFuture<CompletedCheckpoint> checkpointFuture =
 				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
@@ -1963,7 +1964,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CompletableFuture<CompletedCheckpoint> checkpointFuture =
 			checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		verify(tracker, times(1))
 			.reportPendingCheckpoint(eq(1L), any(Long.class), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));


[flink] 04/07: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c0bccaca33f2b6ddda73fa20f6ea6604ccd6b52
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Aug 5 20:37:07 2020 +0800

    [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource.
    
    This patch fixes the CheckpointCoordinator to make it work with
    ExternallyInducedSource in cases that the task snapshots are triggered
    by the external systems via master hooks, rather than the checkpoint
    coordinator.
    
    The problem in the current code is that when the task snapshots are
    triggered externally via the master hooks, the checkpoint coordinator
    may receive all the acks from the tasks before the master state snapshot
    completes. And this leads to checkpoint failure. The fix is to only
    finalize the checkpoint when all of the operator coordinator checkpoint,
    master snapshots and task snapshots are fully taken.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  29 +++++-
 .../checkpoint/CheckpointCoordinatorTest.java      | 112 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 65b57c5..501322e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -574,11 +574,14 @@ public class CheckpointCoordinator {
 										request.advanceToEndOfTime);
 
 									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
+									// It is possible that the tasks has finished checkpointing at this point.
+									// So we need to complete this pending checkpoint.
+									if (!maybeCompleteCheckpoint(checkpoint)) {
+										return null;
+									}
 									onTriggerSuccess();
 								}
 							}
-
 							return null;
 						},
 						timer)
@@ -845,6 +848,26 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	// Returns true if the checkpoint is successfully completed, false otherwise.
+	private boolean maybeCompleteCheckpoint(PendingCheckpoint checkpoint) {
+		synchronized (lock) {
+			if (checkpoint.isFullyAcknowledged()) {
+				try {
+					// we need to check inside the lock for being shutdown as well,
+					// otherwise we get races and invalid error log messages.
+					if (shutdown) {
+						return false;
+					}
+					completePendingCheckpoint(checkpoint);
+				} catch (CheckpointException ce) {
+					onTriggerFailure(checkpoint, ce);
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Handling checkpoints and messages
 	// --------------------------------------------------------------------------------------------
@@ -955,7 +978,7 @@ public class CheckpointCoordinator {
 						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
 							checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
 
-						if (checkpoint.areTasksFullyAcknowledged()) {
+						if (checkpoint.isFullyAcknowledged()) {
 							completePendingCheckpoint(checkpoint);
 						}
 						break;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3207164..eef540c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -63,6 +64,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.verification.VerificationMode;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -79,6 +81,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -2304,6 +2307,115 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the
+	 * master hooks and finished before the master checkpoint.
+	 */
+	@Test
+	public void testTaskCheckpointTriggeredByMasterHooks() throws Exception {
+		final JobID jobId = new JobID();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
+		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
+		TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
+		OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
+		OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
+
+		// Add a master hook which triggers and acks the task checkpoint immediately.
+		// In this case the task checkpoints would complete before the job master checkpoint completes.
+		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
+			@Override
+			public String getIdentifier() {
+				return "anything";
+			}
+
+			@Override
+			@Nullable
+			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
+				// the master state snapshot completes.
+				checkpointIdRef.set(checkpointId);
+				AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+					jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
+				AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+					jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+				return null;
+			}
+
+			@Override
+			public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
+
+			}
+
+			@Override
+			public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
+				return new SimpleVersionedSerializer<Integer>() {
+					@Override
+					public int getVersion() {
+						return 0;
+					}
+
+					@Override
+					public byte[] serialize(Integer obj) throws IOException {
+						return new byte[0];
+					}
+
+					@Override
+					public Integer deserialize(int version, byte[] serialized) throws IOException {
+						return 1;
+					}
+				};
+			}
+		});
+
+		// Verify initial state.
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+		// trigger the first checkpoint. this should succeed
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
+		assertFalse(checkpointFuture.isCompletedExceptionally());
+
+		// now we should have a completed checkpoint
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		// the canceler should be removed now
+		assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+		// validate that the relevant tasks got a confirmation message
+		long checkpointId = checkpointIdRef.get();
+		verify(vertex1.getCurrentExecutionAttempt(),
+			times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
+		verify(vertex2.getCurrentExecutionAttempt(),
+			times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
+
+		CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, success.getJobId());
+		assertEquals(2, success.getOperatorStates().size());
+
+		checkpointCoordinator.shutdown(JobStatus.FINISHED);
+	}
+
 	private CheckpointCoordinator getCheckpointCoordinator(
 		JobID jobId,
 		ExecutionVertex vertex1,


[flink] 07/07: [hotfix] Add unit test for checkpoint failure.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 99cd44f2038d43d2ecfdf98f16882dfd82f4ebb4
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:47:55 2020 +0800

    [hotfix] Add unit test for checkpoint failure.
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 126 +++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 16d7136..78e5336 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -50,6 +53,9 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.ExceptionUtils;
@@ -2436,6 +2442,126 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		checkpointCoordinator.shutdown(JobStatus.FINISHED);
 	}
 
+	@Test
+	public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+		final JobID jobId = new JobID();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
+		TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
+		OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
+		OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+		taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
+
+		// Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
+		AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
+		OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
+			new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+				.setOnCallingCheckpointCoordinator((checkpointId, result) -> {
+					coordCheckpointDone.set(true);
+					result.complete(new byte[0]);
+				})
+				.setOperatorID(opID1)
+				.build();
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
+			.setJobId(jobId)
+			.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
+			.setCheckpointCoordinatorConfiguration(
+				CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+			.setTimer(manuallyTriggeredScheduledExecutor)
+			.setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
+			.setStateBackEnd(new MemoryStateBackend() {
+				private static final long serialVersionUID = 8134582566514272546L;
+
+				// Throw exception when finalizing the checkpoint.
+				@Override
+				public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+					return new MemoryBackendCheckpointStorage(jobId, null, null, 100) {
+						@Override
+						public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+							return new NonPersistentMetadataCheckpointStorageLocation(1000) {
+								@Override
+								public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
+									throw new IOException("Artificial Exception");
+								}
+							};
+						}
+					};
+				}
+			})
+			.build();
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
+		// Add a master hook which triggers and acks the task checkpoint immediately.
+		// In this case the task checkpoints would complete before the job master checkpoint completes.
+		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
+			@Override
+			public String getIdentifier() {
+				return "anything";
+			}
+
+			@Override
+			@Nullable
+			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
+				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
+				// the master state snapshot completes.
+				checkpointIdRef.set(checkpointId);
+				AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+					jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
+				AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+					jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+				return null;
+			}
+
+			@Override
+			public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
+
+			}
+
+			@Override
+			public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
+				return new SimpleVersionedSerializer<Integer>() {
+					@Override
+					public int getVersion() {
+						return 0;
+					}
+
+					@Override
+					public byte[] serialize(Integer obj) throws IOException {
+						return new byte[0];
+					}
+
+					@Override
+					public Integer deserialize(int version, byte[] serialized) throws IOException {
+						return 1;
+					}
+				};
+			}
+		});
+
+		// trigger the first checkpoint. this should succeed
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
+
+		assertTrue(checkpointFuture.isCompletedExceptionally());
+		assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+	}
+
 	private CheckpointCoordinator getCheckpointCoordinator(
 		JobID jobId,
 		ExecutionVertex vertex1,