You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/16 12:21:21 UTC
[flink] 03/05: [refactor] Extract logCheckpointInfo
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6965930b3305be1a35fa5e63c6af11f884ab0d1
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Dec 9 16:37:03 2021 +0100
[refactor] Extract logCheckpointInfo
---
.../runtime/checkpoint/CheckpointCoordinator.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7d9bda9..8f2ebfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1234,9 +1234,20 @@ public class CheckpointCoordinator {
// the 'min delay between checkpoints'
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
+ logCheckpointInfo(completedCheckpoint);
+
+ // send the "notify complete" call to all vertices, coordinators, etc.
+ sendAcknowledgeMessages(
+ pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
+ checkpointId,
+ completedCheckpoint.getTimestamp(),
+ extractIdIfDiscardedOnSubsumed(lastSubsumed));
+ }
+
+ private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) {
LOG.info(
"Completed checkpoint {} for job {} ({} bytes, checkpointDuration={} ms, finalizationTime={} ms).",
- checkpointId,
+ completedCheckpoint.getCheckpointID(),
job,
completedCheckpoint.getStateSize(),
completedCheckpoint.getCompletionTimestamp() - completedCheckpoint.getTimestamp(),
@@ -1254,13 +1265,6 @@ public class CheckpointCoordinator {
LOG.debug(builder.toString());
}
-
- // send the "notify complete" call to all vertices, coordinators, etc.
- sendAcknowledgeMessages(
- pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
- checkpointId,
- completedCheckpoint.getTimestamp(),
- extractIdIfDiscardedOnSubsumed(lastSubsumed));
}
private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoint)