You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/16 12:21:21 UTC

[flink] 03/05: [refactor] Extract logCheckpointInfo

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6965930b3305be1a35fa5e63c6af11f884ab0d1
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Dec 9 16:37:03 2021 +0100

    [refactor] Extract logCheckpointInfo
---
 .../runtime/checkpoint/CheckpointCoordinator.java    | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7d9bda9..8f2ebfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1234,9 +1234,20 @@ public class CheckpointCoordinator {
         // the 'min delay between checkpoints'
         lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
 
+        logCheckpointInfo(completedCheckpoint);
+
+        // send the "notify complete" call to all vertices, coordinators, etc.
+        sendAcknowledgeMessages(
+                pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
+                checkpointId,
+                completedCheckpoint.getTimestamp(),
+                extractIdIfDiscardedOnSubsumed(lastSubsumed));
+    }
+
+    private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) {
         LOG.info(
                 "Completed checkpoint {} for job {} ({} bytes, checkpointDuration={} ms, finalizationTime={} ms).",
-                checkpointId,
+                completedCheckpoint.getCheckpointID(),
                 job,
                 completedCheckpoint.getStateSize(),
                 completedCheckpoint.getCompletionTimestamp() - completedCheckpoint.getTimestamp(),
@@ -1254,13 +1265,6 @@ public class CheckpointCoordinator {
 
             LOG.debug(builder.toString());
         }
-
-        // send the "notify complete" call to all vertices, coordinators, etc.
-        sendAcknowledgeMessages(
-                pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
-                checkpointId,
-                completedCheckpoint.getTimestamp(),
-                extractIdIfDiscardedOnSubsumed(lastSubsumed));
     }
 
     private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoint)