You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/09/08 21:39:14 UTC
[flink] 02/07: [hotfix] Use camel format to replace abbreviations
for the variable names.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 44797b7fef5b226e1b494e4c8590a1052466c07e
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:59:28 2020 +0800
[hotfix] Use camel format to replace abbreviations for the variable names.
---
.../checkpoint/CheckpointCoordinatorTest.java | 590 ++++++++++-----------
1 file changed, 295 insertions(+), 295 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eb96a1b..60cd9ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -169,22 +169,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
try {
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord = getCheckpointCoordinator();
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
// nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should not succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertTrue(checkpointFuture.isCompletedExceptionally());
// still, nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -195,22 +195,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testCheckpointAbortsIfTriggerTasksAreFinished() {
try {
- CheckpointCoordinator coord = getCheckpointCoordinator();
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
// nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should not succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertTrue(checkpointFuture.isCompletedExceptionally());
// still, nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -221,22 +221,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
try {
- CheckpointCoordinator coord = getCheckpointCoordinator();
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
// nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should not succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertTrue(checkpointFuture.isCompletedExceptionally());
// still, nothing should be happening
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -246,7 +246,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -259,24 +259,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
// set up the coordinator
- CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2, checkpointFailureManager);
try {
// trigger the checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkPointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkPointFuture.isCompletedExceptionally());
- long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.areTasksFullyAcknowledged());
// decline checkpoint from the other task
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
fail("Test failed.");
}
@@ -286,7 +286,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertEquals(errorMsg, e.getMessage());
} finally {
try {
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -302,13 +302,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
final String errorMsg = "Exceeded checkpoint failure tolerance number!";
CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
- CheckpointCoordinator coord = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
try {
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- coord.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
+ checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
fail("Test failed.");
}
@@ -317,7 +317,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertTrue(e instanceof RuntimeException);
assertEquals(errorMsg, e.getMessage());
} finally {
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
}
@@ -329,7 +329,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testTriggerAndDeclineCheckpointSimple() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -338,29 +338,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
// validate that we have a pending checkpoint
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// we have one task scheduled that will cancel after timeout
assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
- long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
assertNotNull(checkpoint);
assertEquals(checkpointId, checkpoint.getCheckpointId());
- assertEquals(jid, checkpoint.getJobId());
+ assertEquals(jobId, checkpoint.getJobId());
assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint.getOperatorStates().size());
@@ -372,36 +372,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, checkpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.areTasksFullyAcknowledged());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), "Unknown location");
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.areTasksFullyAcknowledged());
// decline checkpoint from the other task, this should cancel the checkpoint
// and trigger a new one
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
assertTrue(checkpoint.isDiscarded());
// the canceler is also removed
assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
// validate that we have no new pending checkpoint
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
assertTrue(checkpoint.isDiscarded());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -417,7 +417,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testTriggerAndDeclineCheckpointComplex() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -425,36 +425,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture1.isCompletedExceptionally());
// trigger second checkpoint, should also succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture2.isCompletedExceptionally());
// validate that we have a pending checkpoint
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(2, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
- Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
+ Iterator<Map.Entry<Long, PendingCheckpoint>> it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
long checkpoint1Id = it.next().getKey();
long checkpoint2Id = it.next().getKey();
- PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
- PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
+ PendingCheckpoint checkpoint1 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
+ PendingCheckpoint checkpoint2 = checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
assertNotNull(checkpoint1);
assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
- assertEquals(jid, checkpoint1.getJobId());
+ assertEquals(jobId, checkpoint1.getJobId());
assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint1.getOperatorStates().size());
@@ -463,7 +463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertNotNull(checkpoint2);
assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
- assertEquals(jid, checkpoint2.getJobId());
+ assertEquals(jobId, checkpoint2.getJobId());
assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint2.getOperatorStates().size());
@@ -483,25 +483,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
// validate that it is the same second checkpoint from earlier
- long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+ long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint checkpointNew = checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
assertEquals(checkpoint2Id, checkpointIdNew);
assertNotNull(checkpointNew);
assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
- assertEquals(jid, checkpointNew.getJobId());
+ assertEquals(jobId, checkpointNew.getJobId());
assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpointNew.getOperatorStates().size());
@@ -511,15 +511,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
assertTrue(checkpoint1.isDiscarded());
// will not notify abort message again
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -530,7 +530,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testTriggerAndConfirmSimpleCheckpoint() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -539,28 +539,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
// validate that we have a pending checkpoint
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(1, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
- long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
assertNotNull(checkpoint);
assertEquals(checkpointId, checkpoint.getCheckpointId());
- assertEquals(jid, checkpoint.getJobId());
+ assertEquals(jobId, checkpoint.getJobId());
assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint.getOperatorStates().size());
@@ -583,8 +583,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
// acknowledge from one of the tasks
- AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+ AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
@@ -592,21 +592,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(taskOperatorSubtaskStates2, never()).registerSharedStates(any(SharedStateRegistry.class));
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.areTasksFullyAcknowledged());
verify(subtaskState2, never()).registerSharedStates(any(SharedStateRegistry.class));
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
assertTrue(checkpoint.isDiscarded());
// the now we should have a completed checkpoint
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
// the canceler should be removed now
assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
@@ -623,27 +623,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
}
- CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
- assertEquals(jid, success.getJobId());
+ CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+ assertEquals(jobId, success.getJobId());
assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
assertEquals(2, success.getOperatorStates().size());
// ---------------
// trigger another checkpoint and see that this one replaces the other checkpoint
// ---------------
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+ long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
- CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
- assertEquals(jid, successNew.getJobId());
+ CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+ assertEquals(jobId, successNew.getJobId());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
assertTrue(successNew.getOperatorStates().isEmpty());
@@ -656,7 +656,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
}
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -667,7 +667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testMultipleConcurrentCheckpoints() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices
@@ -690,9 +690,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -701,18 +701,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture1.isCompletedExceptionally());
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+ PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
@@ -720,20 +720,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), any(Long.class), any(CheckpointOptions.class));
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
// start the second checkpoint
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture2.isCompletedExceptionally());
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
PendingCheckpoint pending2;
{
- Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+ Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
PendingCheckpoint cc1 = all.next();
PendingCheckpoint cc2 = all.next();
pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -746,44 +746,44 @@ public class CheckpointCoordinatorTest extends TestLogger {
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
// now, the first checkpoint should be confirmed
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertTrue(pending1.isDiscarded());
// the first confirm message should be out
verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), any(Long.class));
// send the last remaining ack for the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
// now, the second checkpoint should be confirmed
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertTrue(pending2.isDiscarded());
// the second commit message should be out
verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
// validate the committed checkpoints
- List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+ List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
CompletedCheckpoint sc1 = scs.get(0);
assertEquals(checkpointId1, sc1.getCheckpointID());
- assertEquals(jid, sc1.getJobId());
+ assertEquals(jobId, sc1.getJobId());
assertTrue(sc1.getOperatorStates().isEmpty());
CompletedCheckpoint sc2 = scs.get(1);
assertEquals(checkpointId2, sc2.getCheckpointID());
- assertEquals(jid, sc2.getJobId());
+ assertEquals(jobId, sc2.getJobId());
assertTrue(sc2.getOperatorStates().isEmpty());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -794,7 +794,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testSuccessfulCheckpointSubsumesUnsuccessful() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices
final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
@@ -816,9 +816,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex1, triggerVertex2 })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 })
@@ -827,18 +827,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
- final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture1.isCompletedExceptionally());
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+ PendingCheckpoint pending1 = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
@@ -861,21 +861,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
// start the second checkpoint
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture2.isCompletedExceptionally());
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
PendingCheckpoint pending2;
{
- Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+ Iterator<PendingCheckpoint> all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
PendingCheckpoint cc1 = all.next();
PendingCheckpoint cc2 = all.next();
pending2 = pending1 == cc1 ? cc2 : cc1;
@@ -901,13 +901,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
// we acknowledge one more task from the first checkpoint and the second
// checkpoint completely. The second checkpoint should then subsume the first checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
// now, the second checkpoint should be confirmed, and the first discarded
// actually both pending checkpoints are discarded, and the second has been transformed
@@ -915,8 +915,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertTrue(pending1.isDiscarded());
assertTrue(pending2.isDiscarded());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// validate that all received subtask states in the first checkpoint have been discarded
verify(subtaskState11, times(1)).discardState();
@@ -928,20 +928,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(subtaskState23, never()).discardState();
// validate the committed checkpoints
- List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+ List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
CompletedCheckpoint success = scs.get(0);
assertEquals(checkpointId2, success.getCheckpointID());
- assertEquals(jid, success.getJobId());
+ assertEquals(jobId, success.getJobId());
assertEquals(3, success.getOperatorStates().size());
// the first confirm message should be out
verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), any(Long.class));
// send the last remaining ack for the first checkpoint. This should not do anything
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
verify(subtaskState13, times(1)).discardState();
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
// validate that the states in the second checkpoint have been discarded
verify(subtaskState21, times(1)).discardState();
@@ -958,7 +958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testCheckpointTimeoutIsolated() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices
@@ -977,9 +977,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
// set up the coordinator
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -988,12 +988,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
.build();
// trigger a checkpoint, partially acknowledged
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+ PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
assertFalse(checkpoint.isDiscarded());
OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
@@ -1002,13 +1002,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorSubtaskState subtaskState1 = mock(OperatorSubtaskState.class);
taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
// triggers cancelling
manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// validate that the received states have been discarded
verify(subtaskState1, times(1)).discardState();
@@ -1016,7 +1016,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// no confirm message must have been sent
verify(commitVertex.getCurrentExecutionAttempt(), times(0)).notifyCheckpointComplete(anyLong(), anyLong());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -1027,7 +1027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testHandleMessagesForNonExistingCheckpoints() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices and trigger some checkpoint
@@ -1041,9 +1041,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex1, ackVertex2 })
.setTasksToCommitTo(new ExecutionVertex[] { commitVertex })
@@ -1051,26 +1051,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
- long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
// send some messages that do not belong to either the job or the any
// of the vertices that need to be acknowledged.
// non of the messages should throw an exception
// wrong job id
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
// unknown checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
// unknown ack vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -1103,7 +1103,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointCoordinatorConfigurationBuilder()
.setMaxConcurrentCheckpoints(1)
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setJobId(jobId)
.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1113,13 +1113,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- final CompletableFuture<CompletedCheckpoint> checkpointFuture = coord.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+ PendingCheckpoint pendingCheckpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
long checkpointId = pendingCheckpoint.getCheckpointId();
@@ -1130,7 +1130,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
// acknowledge the first trigger vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
// verify that the subtask state has not been discarded
verify(subtaskStateTrigger, never()).discardState();
@@ -1138,7 +1138,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
TaskStateSnapshot unknownSubtaskState = mock(TaskStateSnapshot.class);
// receive an acknowledge message for an unknown vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState, times(1)).discardState();
@@ -1146,21 +1146,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
TaskStateSnapshot differentJobSubtaskState = mock(TaskStateSnapshot.class);
// receive an acknowledge message from an unknown job
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();
// duplicate acknowledge message for the trigger vertex
TaskStateSnapshot triggerSubtaskState = mock(TaskStateSnapshot.class);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
// duplicate acknowledge messages for a known vertex should not trigger discarding the state
verify(triggerSubtaskState, never()).discardState();
// let the checkpoint fail at the first ack vertex
reset(subtaskStateTrigger);
- coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
assertTrue(pendingCheckpoint.isDiscarded());
@@ -1170,14 +1170,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
TaskStateSnapshot ackSubtaskState = mock(TaskStateSnapshot.class);
// late acknowledge message from the second ack vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
// check that we also cleaned up this state
verify(ackSubtaskState, times(1)).discardState();
// receive an acknowledge message from an unknown job
reset(differentJobSubtaskState);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();
@@ -1185,7 +1185,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
TaskStateSnapshot unknownSubtaskState2 = mock(TaskStateSnapshot.class);
// receive an acknowledge message for an unknown vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState2, times(1)).discardState();
@@ -1208,7 +1208,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1217,26 +1217,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2);
+ CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
- CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(savepointFuture.isDone());
// validate that we have a pending savepoint
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId);
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
assertNotNull(pending);
assertEquals(checkpointId, pending.getCheckpointId());
- assertEquals(jid, pending.getJobId());
+ assertEquals(jobId, pending.getJobId());
assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
assertEquals(0, pending.getNumberOfAcknowledgedTasks());
assertEquals(0, pending.getOperatorStates().size());
@@ -1254,8 +1254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
// acknowledge from one of the tasks
- AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+ AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
assertEquals(1, pending.getNumberOfAcknowledgedTasks());
assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
assertFalse(pending.isDiscarded());
@@ -1263,13 +1263,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertFalse(savepointFuture.isDone());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
assertFalse(pending.isDiscarded());
assertFalse(pending.areTasksFullyAcknowledged());
assertFalse(savepointFuture.isDone());
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
@@ -1277,8 +1277,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertNotNull(savepointFuture.get());
// the now we should have a completed checkpoint
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
// validate that the relevant tasks got a confirmation message
{
@@ -1292,27 +1292,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
}
- CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
- assertEquals(jid, success.getJobId());
+ CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+ assertEquals(jobId, success.getJobId());
assertEquals(pending.getCheckpointId(), success.getCheckpointID());
assertEquals(2, success.getOperatorStates().size());
// ---------------
// trigger another checkpoint and see that this one replaces the other checkpoint
// ---------------
- savepointFuture = coord.triggerSavepoint(savepointDir);
+ savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(savepointFuture.isDone());
- long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+ long checkpointIdNew = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
- assertEquals(jid, successNew.getJobId());
+ CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+ assertEquals(jobId, successNew.getJobId());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
assertTrue(successNew.getOperatorStates().isEmpty());
assertNotNull(savepointFuture.get());
@@ -1330,7 +1330,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class));
}
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
/**
@@ -1341,7 +1341,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
*/
@Test
public void testSavepointsAreNotSubsumed() throws Exception {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -1352,9 +1352,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
.setCheckpointIDCounter(counter)
@@ -1365,67 +1365,67 @@ public class CheckpointCoordinatorTest extends TestLogger {
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
// Trigger savepoint and checkpoint
- CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
long savepointId1 = counter.getLast();
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- CompletableFuture<CompletedCheckpoint> checkpointFuture1 = coord.triggerCheckpoint(false);
+ CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertFalse(checkpointFuture1.isCompletedExceptionally());
- CompletableFuture<CompletedCheckpoint> checkpointFuture2 = coord.triggerCheckpoint(false);
+ CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture2.isCompletedExceptionally());
long checkpointId2 = counter.getLast();
- assertEquals(3, coord.getNumberOfPendingCheckpoints());
+ assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+ assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
assertFalse(savepointFuture1.isDone());
- CompletableFuture<CompletedCheckpoint> checkpointFuture3 = coord.triggerCheckpoint(false);
+ CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture3.isCompletedExceptionally());
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
long savepointId2 = counter.getLast();
assertFalse(savepointFuture2.isCompletedExceptionally());
- assertEquals(3, coord.getNumberOfPendingCheckpoints());
+ assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDiscarded());
assertFalse(savepointFuture1.isDone());
assertNotNull(savepointFuture2.get());
// Ack first savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertEquals(3, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertNotNull(savepointFuture1.get());
}
private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices and trigger some checkpoint
final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1457,9 +1457,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setMinPauseBetweenCheckpoints(0L) // no extra delay
.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(chkConfig)
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1468,7 +1468,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- coord.startCheckpointScheduler();
+ checkpointCoordinator.startCheckpointScheduler();
for (int i = 0; i < maxConcurrentAttempts; i++) {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
@@ -1481,7 +1481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
final Collection<ScheduledFuture<?>> periodicScheduledTasks =
manuallyTriggeredScheduledExecutor.getPeriodicScheduledTask();
@@ -1498,7 +1498,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
assertEquals(maxConcurrentAttempts + 1, numCalls.get());
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -1510,7 +1510,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
public void testMaxConcurrentAttempsWithSubsumption() {
try {
final int maxConcurrentAttempts = 2;
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices and trigger some checkpoint
final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1528,9 +1528,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setMinPauseBetweenCheckpoints(0L) // no extra delay
.setMaxConcurrentCheckpoints(maxConcurrentAttempts)
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(chkConfig)
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1539,37 +1539,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- coord.startCheckpointScheduler();
+ checkpointCoordinator.startCheckpointScheduler();
do {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
}
- while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+ while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
// validate that the pending checkpoints are there
- assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
- assertNotNull(coord.getPendingCheckpoints().get(1L));
- assertNotNull(coord.getPendingCheckpoints().get(2L));
+ assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
+ assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
// now we acknowledge the second checkpoint, which should subsume the first checkpoint
// and allow two more checkpoints to be triggered
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
// after a while, there should be the new checkpoints
do {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
}
- while (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
+ while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
// do the final check
- assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
- assertNotNull(coord.getPendingCheckpoints().get(3L));
- assertNotNull(coord.getPendingCheckpoints().get(4L));
+ assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
+ assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -1580,7 +1580,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testPeriodicSchedulingWithInactiveTasks() {
try {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
// create some mock execution vertices and trigger some checkpoint
final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
@@ -1601,9 +1601,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setMinPauseBetweenCheckpoints(0) // no extra delay
.setMaxConcurrentCheckpoints(2) // max two concurrent checkpoints
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setCheckpointCoordinatorConfiguration(chkConfig)
.setTasksToTrigger(new ExecutionVertex[] { triggerVertex })
.setTasksToWaitFor(new ExecutionVertex[] { ackVertex })
@@ -1612,12 +1612,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- coord.startCheckpointScheduler();
+ checkpointCoordinator.startCheckpointScheduler();
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
// no checkpoint should have started so far
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
// now move the state to RUNNING
currentState.set(ExecutionState.RUNNING);
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+ assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
}
catch (Exception e) {
e.printStackTrace();
@@ -1651,7 +1651,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointCoordinatorConfigurationBuilder()
.setMaxConcurrentCheckpoints(1) // max one checkpoint at a time => should not affect savepoints
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setJobId(jobId)
.setCheckpointCoordinatorConfiguration(chkConfig)
@@ -1667,7 +1667,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// Trigger savepoints
for (int i = 0; i < numSavepoints; i++) {
- savepointFutures.add(coord.triggerSavepoint(savepointDir));
+ savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir));
}
// After triggering multiple savepoints, all should in progress
@@ -1680,7 +1680,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// ACK all savepoints
long checkpointId = checkpointIDCounter.getLast();
for (int i = 0; i < numSavepoints; i++, checkpointId--) {
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
}
// After ACKs, all should be completed
@@ -1699,7 +1699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setMinPauseBetweenCheckpoints(100000000L) // very long min delay => should not affect savepoints
.setMaxConcurrentCheckpoints(1)
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointCoordinatorConfiguration(chkConfig)
.setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2))
@@ -1708,10 +1708,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
- CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir);
assertFalse("Did not trigger savepoint", savepoint0.isDone());
- CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir);
assertFalse("Did not trigger savepoint", savepoint1.isDone());
}
@@ -1727,18 +1727,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointCoordinatorConfigurationBuilder()
.setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)
.build();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointCoordinatorConfiguration(chkConfig)
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
CompletableFuture<CompletedCheckpoint> checkpointFuture =
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
- for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
+ for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
CheckpointProperties props = checkpoint.getProps();
CheckpointProperties expected = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
@@ -1746,7 +1746,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
// the now we should have a completed checkpoint
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
e.printStackTrace();
@@ -1948,20 +1948,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- coord.setCheckpointStatsTracker(tracker);
+ checkpointCoordinator.setCheckpointStatsTracker(tracker);
when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
.thenReturn(mock(PendingCheckpointStats.class));
// Trigger a checkpoint and verify callback
CompletableFuture<CompletedCheckpoint> checkpointFuture =
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(checkpointFuture.isCompletedExceptionally());
@@ -1977,7 +1977,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCompletedCheckpointStore(store)
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -1994,9 +1994,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
new TestCompletedCheckpointStorageLocation()));
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- coord.setCheckpointStatsTracker(tracker);
+ checkpointCoordinator.setCheckpointStatsTracker(tracker);
- assertTrue(coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
+ assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
verify(tracker, times(1))
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
@@ -2005,7 +2005,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Test
public void testSharedStateRegistrationOnRestore() throws Exception {
- final JobID jid = new JobID();
+ final JobID jobId = new JobID();
final JobVertexID jobVertexID1 = new JobVertexID();
@@ -2029,9 +2029,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2);
// set up the coordinator and validate the initial state
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
- .setJobId(jid)
+ .setJobId(jobId)
.setTasks(arrayExecutionVertices)
.setCompletedCheckpointStore(store)
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -2049,10 +2049,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
for (int i = 0; i < numCheckpoints; ++i) {
- performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, i);
+ performIncrementalCheckpoint(jobId, checkpointCoordinator, jobVertex1, keyGroupPartitions1, i);
}
- List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
+ List<CompletedCheckpoint> completedCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
assertEquals(numCheckpoints, completedCheckpoints.size());
int sharedHandleCount = 0;
@@ -2112,7 +2112,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// restore the store
Set<ExecutionJobVertex> tasks = new HashSet<>();
tasks.add(jobVertex1);
- assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+ assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
// validate that all shared states are registered again after the recovery.
cp = 0;
@@ -2225,18 +2225,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
public void testTriggerCheckpointAfterCancel() throws Exception {
// set up the coordinator
TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
- CheckpointCoordinator coord =
+ CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointIDCounter(idCounter)
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
- idCounter.setOwner(coord);
+ idCounter.setOwner(checkpointCoordinator);
try {
// start the coordinator
- coord.startCheckpointScheduler();
+ checkpointCoordinator.startCheckpointScheduler();
final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
- coord.triggerCheckpoint(
+ checkpointCoordinator.triggerCheckpoint(
CheckpointProperties
.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
@@ -2254,7 +2254,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
} finally {
- coord.shutdown(JobStatus.FINISHED);
+ checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
}
@@ -2387,18 +2387,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
private void performIncrementalCheckpoint(
- JobID jid,
- CheckpointCoordinator coord,
+ JobID jobId,
+ CheckpointCoordinator checkpointCoordinator,
ExecutionJobVertex jobVertex1,
List<KeyGroupRange> keyGroupPartitions1,
int cpSequenceNumber) throws Exception {
// trigger the checkpoint
- coord.triggerCheckpoint(false);
+ checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, coord.getPendingCheckpoints().size());
- long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+ assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+ long checkpointId = Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
@@ -2445,13 +2445,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
+ jobId,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
checkpointId,
new CheckpointMetrics(),
taskStateSnapshot);
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
}
}