You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:23 UTC

[flink] 03/04: [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 906f3244a3febf3bfe5221290d4cd3ad4746765f
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:55:31 2022 +0100

    [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 31 ++++++++++++++++++++--
 .../runtime/checkpoint/PendingCheckpoint.java      | 18 +------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 21 +++++++++++++--
 .../runtime/checkpoint/PendingCheckpointTest.java  |  5 ++--
 4 files changed, 51 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5248538..5deee51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1256,11 +1256,39 @@ public class CheckpointCoordinator {
             } else {
                 lastSubsumed = null;
             }
+
+            reportCompletedCheckpoint(completedCheckpoint);
         } finally {
             pendingCheckpoints.remove(checkpointId);
             scheduleTriggerRequest();
         }
 
+        cleanupAfterCompletedCheckpoint(
+                pendingCheckpoint, checkpointId, completedCheckpoint, lastSubsumed, props);
+    }
+
+    private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        CompletedCheckpointStats completedCheckpointStats = completedCheckpoint.getStatistic();
+        if (completedCheckpointStats != null) {
+            LOG.trace(
+                    "Checkpoint {} size: {}Kb, duration: {}ms",
+                    completedCheckpoint.getCheckpointID(),
+                    completedCheckpointStats.getStateSize() == 0
+                            ? 0
+                            : completedCheckpointStats.getStateSize() / 1024,
+                    completedCheckpointStats.getEndToEndDuration());
+            // Finalize the statsCallback and give the completed checkpoint a
+            // callback for discards.
+            statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
+        }
+    }
+
+    private void cleanupAfterCompletedCheckpoint(
+            PendingCheckpoint pendingCheckpoint,
+            long checkpointId,
+            CompletedCheckpoint completedCheckpoint,
+            CompletedCheckpoint lastSubsumed,
+            CheckpointProperties props) {
         // remember recent checkpoint id for debugging purposes
         rememberRecentCheckpointId(checkpointId);
 
@@ -1313,8 +1341,7 @@ public class CheckpointCoordinator {
                     pendingCheckpoint.finalizeCheckpoint(
                             checkpointsCleaner,
                             this::scheduleTriggerRequest,
-                            executor,
-                            statsTracker);
+                            executor);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 76c6a44..08cd23f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -303,10 +303,7 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     public CompletedCheckpoint finalizeCheckpoint(
-            CheckpointsCleaner checkpointsCleaner,
-            Runnable postCleanup,
-            Executor executor,
-            CheckpointStatsTracker statsTracker)
+            CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
             throws IOException {
 
         synchronized (lock) {
@@ -342,19 +339,6 @@ public class PendingCheckpoint implements Checkpoint {
                                 finalizedLocation,
                                 toCompletedCheckpointStats(finalizedLocation));
 
-                CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
-                if (completedCheckpointStats != null) {
-                    LOG.trace(
-                            "Checkpoint {} size: {}Kb, duration: {}ms",
-                            checkpointId,
-                            completedCheckpointStats.getStateSize() == 0
-                                    ? 0
-                                    : completedCheckpointStats.getStateSize() / 1024,
-                            completedCheckpointStats.getEndToEndDuration());
-
-                    statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
-                }
-
                 onCompletionPromise.complete(completed);
 
                 // mark this pending checkpoint as disposed, but do NOT drop the state
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2cfa183..df8d606 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1988,9 +1988,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
-
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         // set up the coordinator and validate the initial state
-        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointCoordinatorConfiguration(
+                                CheckpointCoordinatorConfiguration.builder()
+                                        .setAlignedCheckpointTimeout(Long.MAX_VALUE)
+                                        .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                                        .build())
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
+                        .build();
 
         assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
         assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -2083,6 +2094,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
         assertEquals(pending.getCheckpointId(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
+        AbstractCheckpointStats actualStats =
+                statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
+
+        assertEquals(checkpointId, actualStats.getCheckpointId());
+        assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+
         checkpointCoordinator.shutdown();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 294c6fa..799d743 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -217,8 +217,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
-        pending.finalizeCheckpoint(
-                new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
         assertTrue(future.isDone());
 
         // Finalize (missing ACKs)
@@ -228,7 +227,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         try {
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected