You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/20 06:59:40 UTC
[flink] 02/03: [FLINK-8871][checkpoint] Support to cancel
checkpoing via notification on checkpoint coordinator side
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fcacc42e17f00cb47c5c16fe75af035f784ae1fa
Author: Yun Tang <my...@live.com>
AuthorDate: Mon May 11 13:49:18 2020 +0800
[FLINK-8871][checkpoint] Support to cancel checkpoing via notification on checkpoint coordinator side
---
.../runtime/checkpoint/CheckpointCoordinator.java | 16 +++++++++++++++
.../checkpoint/CheckpointFailureManager.java | 1 +
.../checkpoint/CheckpointFailureReason.java | 2 ++
.../flink/runtime/executiongraph/Execution.java | 19 ++++++++++++++++++
.../jobmanager/slots/TaskManagerGateway.java | 14 +++++++++++++
.../runtime/jobmaster/RpcTaskManagerGateway.java | 5 +++++
.../flink/runtime/taskexecutor/TaskExecutor.java | 23 +++++++++++++++++++++-
.../runtime/taskexecutor/TaskExecutorGateway.java | 11 +++++++++++
.../checkpoint/CheckpointCoordinatorTest.java | 7 +++++++
.../utils/SimpleAckingTaskManagerGateway.java | 7 +++++++
.../taskexecutor/TestingTaskExecutorGateway.java | 5 +++++
11 files changed, 109 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9e2a4a7..7689f29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -113,6 +113,7 @@ public class CheckpointCoordinator {
private final ExecutionVertex[] tasksToWaitFor;
/** Tasks who need to be sent a message when a checkpoint is confirmed. */
+ // TODO currently we use commit vertices to receive "abort checkpoint" messages.
private final ExecutionVertex[] tasksToCommitTo;
/** The operator coordinators that need to be checkpointed. */
@@ -1015,6 +1016,7 @@ public class CheckpointCoordinator {
}
});
+ sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp());
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
}
@@ -1067,6 +1069,19 @@ public class CheckpointCoordinator {
}
}
+ private void sendAbortedMessages(long checkpointId, long timeStamp) {
+ // send notification of aborted checkpoints asynchronously.
+ executor.execute(() -> {
+ // send the "abort checkpoint" messages to necessary vertices.
+ for (ExecutionVertex ev : tasksToCommitTo) {
+ Execution ee = ev.getCurrentExecutionAttempt();
+ if (ee != null) {
+ ee.notifyCheckpointAborted(checkpointId, timeStamp);
+ }
+ }
+ });
+ }
+
/**
* Fails all pending checkpoints which have not been acknowledged by the given execution
* attempt id.
@@ -1576,6 +1591,7 @@ public class CheckpointCoordinator {
exception, pendingCheckpoint.getCheckpointId());
}
} finally {
+ sendAbortedMessages(pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointTimestamp());
pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId());
rememberRecentCheckpointId(pendingCheckpoint.getCheckpointId());
timer.execute(this::executeQueuedRequest);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 9e162e7..0dc655b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -120,6 +120,7 @@ public class CheckpointFailureManager {
case CHECKPOINT_EXPIRED:
case TASK_FAILURE:
case TASK_CHECKPOINT_FAILURE:
+ case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
case TRIGGER_CHECKPOINT_FAILURE:
case FINALIZE_CHECKPOINT_FAILURE:
//ignore
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 023f9bf..cd787d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -68,6 +68,8 @@ public enum CheckpointFailureReason {
TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."),
+ UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE(false, "Unknown task for the checkpoint to notify."),
+
FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 7d60a6c..0102ce7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -994,6 +994,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
/**
+ * Notify the task of this execution about a aborted checkpoint.
+ *
+ * @param abortCheckpointId of the subsumed checkpoint
+ * @param timestamp of the subsumed checkpoint
+ */
+ public void notifyCheckpointAborted(long abortCheckpointId, long timestamp) {
+ final LogicalSlot slot = assignedResource;
+
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+ taskManagerGateway.notifyCheckpointAborted(attemptId, getVertex().getJobId(), abortCheckpointId, timestamp);
+ } else {
+ LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+ "no longer running.");
+ }
+ }
+
+ /**
* Trigger a new checkpoint on the task of this execution.
*
* @param checkpointId of th checkpoint to trigger
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index da21982..e7cbeae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -120,6 +120,20 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
long timestamp);
/**
+ * Notify the given task about a aborted checkpoint.
+ *
+ * @param executionAttemptID identifying the task
+ * @param jobId identifying the job to which the task belongs
+ * @param checkpointId of the subsumed checkpoint
+ * @param timestamp of the subsumed checkpoint
+ */
+ void notifyCheckpointAborted(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp);
+
+ /**
* Trigger for the given task a checkpoint.
*
* @param executionAttemptID identifying the task
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 9aec97e..2a20b71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -93,6 +93,11 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
}
@Override
+ public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+ taskExecutorGateway.abortCheckpoint(executionAttemptID, checkpointId, timestamp);
+ }
+
+ @Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d74532f..f8438f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -860,7 +860,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
log.debug(message);
- return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
+ return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
+ }
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> abortCheckpoint(
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ long checkpointTimestamp) {
+ log.debug("Abort checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
+
+ final Task task = taskSlotTable.getTask(executionAttemptID);
+
+ if (task != null) {
+ task.notifyCheckpointAborted(checkpointId);
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ } else {
+ final String message = "TaskManager received an aborted checkpoint for unknown task " + executionAttemptID + '.';
+
+ log.debug(message);
+ return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ad37db0..6867451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -156,6 +156,17 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve
CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
/**
+ * Abort a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
+ * and the checkpoint timestamp.
+ *
+ * @param executionAttemptID identifying the task
+ * @param checkpointId unique id for the checkpoint
+ * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+ * @return Future acknowledge if the checkpoint has been successfully confirmed
+ */
+ CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+
+ /**
* Cancel the given task.
*
* @param executionAttemptID identifying the task
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ac48106..e799836 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -425,6 +425,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
// decline checkpoint from one of the tasks, this should cancel the checkpoint
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
@@ -453,6 +456,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
assertTrue(checkpoint1.isDiscarded());
+ // will not notify abort message again
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
coord.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 7e6ee2d..59aee81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -129,6 +129,13 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
long timestamp) {}
@Override
+ public void notifyCheckpointAborted(
+ ExecutionAttemptID executionAttemptID,
+ JobID jobId,
+ long checkpointId,
+ long timestamp) {}
+
+ @Override
public void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index f7e82d2..2a039c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -169,6 +169,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
}
@Override
+ public CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
return cancelTaskFunction.apply(executionAttemptID);
}