You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:21 UTC

[flink] 01/04: [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e7d45d53b7ea7b9cfadf2e293ba790f3a9e90c3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:41:20 2022 +0100

    [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  35 ++++----
 .../runtime/checkpoint/CheckpointStatsTracker.java |  43 +--------
 .../flink/runtime/checkpoint/Checkpoints.java      |   3 +-
 .../runtime/checkpoint/CompletedCheckpoint.java    |  31 ++++---
 .../checkpoint/CompletedCheckpointStats.java       |  24 +----
 .../runtime/checkpoint/FailedCheckpointStats.java  |  17 ----
 .../runtime/checkpoint/PendingCheckpoint.java      |  65 ++++++++------
 .../runtime/checkpoint/PendingCheckpointStats.java |  90 +++++++------------
 .../CheckpointCoordinatorMasterHooksTest.java      |   6 +-
 .../CheckpointCoordinatorRestoringTest.java        |   9 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |   6 +-
 .../checkpoint/CheckpointStatsTrackerTest.java     |  16 ++--
 .../checkpoint/CompletedCheckpointStoreTest.java   |   3 +-
 .../checkpoint/CompletedCheckpointTest.java        |  62 +++++++++----
 .../DefaultCompletedCheckpointStoreTest.java       |   3 +-
 .../DefaultCompletedCheckpointStoreUtilsTest.java  |   3 +-
 .../checkpoint/PendingCheckpointStatsTest.java     |  32 ++-----
 .../runtime/checkpoint/PendingCheckpointTest.java  | 100 +++------------------
 .../StandaloneCompletedCheckpointStoreTest.java    |   3 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   3 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |   3 +-
 ...topWithSavepointTerminationHandlerImplTest.java |   3 +-
 24 files changed, 218 insertions(+), 348 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 335ea9a..5248538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -779,6 +779,9 @@ public class CheckpointCoordinator {
             }
         }
 
+        PendingCheckpointStats pendingCheckpointStats =
+                trackPendingCheckpointStats(checkpointID, checkpointPlan, props, timestamp);
+
         final PendingCheckpoint checkpoint =
                 new PendingCheckpoint(
                         job,
@@ -789,9 +792,8 @@ public class CheckpointCoordinator {
                         masterHooks.keySet(),
                         props,
                         checkpointStorageLocation,
-                        onCompletionPromise);
-
-        trackPendingCheckpointStats(checkpoint);
+                        onCompletionPromise,
+                        pendingCheckpointStats);
 
         synchronized (lock) {
             pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1122,8 +1124,7 @@ public class CheckpointCoordinator {
                 switch (checkpoint.acknowledgeTask(
                         message.getTaskExecutionId(),
                         message.getSubtaskState(),
-                        message.getCheckpointMetrics(),
-                        getStatsCallback(checkpoint))) {
+                        message.getCheckpointMetrics())) {
                     case SUCCESS:
                         LOG.debug(
                                 "Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1313,7 +1314,7 @@ public class CheckpointCoordinator {
                             checkpointsCleaner,
                             this::scheduleTriggerRequest,
                             executor,
-                            getStatsCallback(pendingCheckpoint));
+                            statsTracker);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
@@ -2041,7 +2042,7 @@ public class CheckpointCoordinator {
                         checkpointsCleaner,
                         this::scheduleTriggerRequest,
                         executor,
-                        getStatsCallback(pendingCheckpoint));
+                        statsTracker);
 
                 failureManager.handleCheckpointException(
                         pendingCheckpoint,
@@ -2180,11 +2181,15 @@ public class CheckpointCoordinator {
         SKIP;
     }
 
-    private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
+    private PendingCheckpointStats trackPendingCheckpointStats(
+            long checkpointId,
+            CheckpointPlan checkpointPlan,
+            CheckpointProperties props,
+            long checkpointTimestamp) {
         Map<JobVertexID, Integer> vertices =
                 Stream.concat(
-                                checkpoint.getCheckpointPlan().getTasksToWaitFor().stream(),
-                                checkpoint.getCheckpointPlan().getFinishedTasks().stream())
+                                checkpointPlan.getTasksToWaitFor().stream(),
+                                checkpointPlan.getFinishedTasks().stream())
                         .map(Execution::getVertex)
                         .map(ExecutionVertex::getJobVertex)
                         .distinct()
@@ -2195,13 +2200,11 @@ public class CheckpointCoordinator {
 
         PendingCheckpointStats pendingCheckpointStats =
                 statsTracker.reportPendingCheckpoint(
-                        checkpoint.getCheckpointID(),
-                        checkpoint.getCheckpointTimestamp(),
-                        checkpoint.getProps(),
-                        vertices);
+                        checkpointId, checkpointTimestamp, props, vertices);
+
+        reportFinishedTasks(pendingCheckpointStats, checkpointPlan.getFinishedTasks());
 
-        reportFinishedTasks(
-                pendingCheckpointStats, checkpoint.getCheckpointPlan().getFinishedTasks());
+        return pendingCheckpointStats;
     }
 
     private void reportFinishedTasks(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index c691614..f10a668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -160,12 +160,7 @@ public class CheckpointStatsTracker {
             Map<JobVertexID, Integer> vertexToDop) {
 
         PendingCheckpointStats pending =
-                new PendingCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        vertexToDop,
-                        PendingCheckpointStatsCallback.proxyFor(this));
+                new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop);
 
         statsReadWriteLock.lock();
         try {
@@ -204,7 +199,7 @@ public class CheckpointStatsTracker {
      *
      * @param completed The completed checkpoint stats.
      */
-    private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+    void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
         statsReadWriteLock.lock();
         try {
             latestCompletedCheckpoint = completed;
@@ -225,7 +220,7 @@ public class CheckpointStatsTracker {
      *
      * @param failed The failed checkpoint stats.
      */
-    private void reportFailedCheckpoint(FailedCheckpointStats failed) {
+    void reportFailedCheckpoint(FailedCheckpointStats failed) {
         statsReadWriteLock.lock();
         try {
             counts.incrementFailedCheckpoints();
@@ -276,38 +271,6 @@ public class CheckpointStatsTracker {
         }
     }
 
-    /** Callback for finalization of a pending checkpoint. */
-    interface PendingCheckpointStatsCallback {
-        /**
-         * Report a completed checkpoint.
-         *
-         * @param completed The completed checkpoint.
-         */
-        void reportCompletedCheckpoint(CompletedCheckpointStats completed);
-
-        /**
-         * Report a failed checkpoint.
-         *
-         * @param failed The failed checkpoint.
-         */
-        void reportFailedCheckpoint(FailedCheckpointStats failed);
-
-        static PendingCheckpointStatsCallback proxyFor(
-                CheckpointStatsTracker checkpointStatsTracker) {
-            return new PendingCheckpointStatsCallback() {
-                @Override
-                public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-                    checkpointStatsTracker.reportCompletedCheckpoint(completed);
-                }
-
-                @Override
-                public void reportFailedCheckpoint(FailedCheckpointStats failed) {
-                    checkpointStatsTracker.reportFailedCheckpoint(failed);
-                }
-            };
-        }
-    }
-
     // ------------------------------------------------------------------------
     // Metrics
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 7d27d50..cc97510 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -215,7 +215,8 @@ public class Checkpoints {
                 checkpointProperties,
                 restoreMode == RestoreMode.CLAIM
                         ? new ClaimModeCompletedStorageLocation(location)
-                        : location);
+                        : location,
+                null);
     }
 
     private static void throwNonRestoredStateException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1972427..841e543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -104,8 +104,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
     /** External pointer to the completed checkpoint (for example file path). */
     private final String externalPointer;
 
-    /** Optional stats tracker callback for discard. */
-    @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
+    /** Completed statistic for managing discard marker. */
+    @Nullable private final transient CompletedCheckpointStats completedCheckpointStats;
 
     // ------------------------------------------------------------------------
 
@@ -117,7 +117,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
             Map<OperatorID, OperatorState> operatorStates,
             @Nullable Collection<MasterState> masterHookStates,
             CheckpointProperties props,
-            CompletedCheckpointStorageLocation storageLocation) {
+            CompletedCheckpointStorageLocation storageLocation,
+            @Nullable CompletedCheckpointStats completedCheckpointStats) {
 
         checkArgument(checkpointID >= 0);
         checkArgument(timestamp >= 0);
@@ -140,6 +141,7 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         this.storageLocation = checkNotNull(storageLocation);
         this.metadataHandle = storageLocation.getMetadataHandle();
         this.externalPointer = storageLocation.getExternalPointer();
+        this.completedCheckpointStats = completedCheckpointStats;
     }
 
     // ------------------------------------------------------------------------
@@ -274,14 +276,19 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         } finally {
             operatorStates.clear();
 
-            // to be null-pointer safe, copy reference to stack
-            CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
-            if (discardCallback != null) {
-                discardCallback.notifyDiscardedCheckpoint();
+            if (completedCheckpointStats != null) {
+                completedCheckpointStats.discard();
             }
         }
     }
 
+    /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */
+    public void markAsDiscarded() {
+        if (completedCheckpointStats != null) {
+            completedCheckpointStats.discard();
+        }
+    }
+
     public boolean shouldBeDiscardedOnSubsume() {
         return props.discardOnSubsumed();
     }
@@ -320,13 +327,9 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         return firstInterestingFields.equals(secondInterestingFields);
     }
 
-    /**
-     * Sets the callback for tracking when this checkpoint is discarded.
-     *
-     * @param discardCallback Callback to call when the checkpoint is discarded.
-     */
-    void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) {
-        this.discardCallback = discardCallback;
+    @Nullable
+    public CompletedCheckpointStats getStatistic() {
+        return completedCheckpointStats;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 38a5c9f..88339ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -183,28 +183,10 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
     }
 
     /**
-     * Returns the callback for the {@link CompletedCheckpoint}.
-     *
-     * @return Callback for the {@link CompletedCheckpoint}.
-     */
-    DiscardCallback getDiscardCallback() {
-        return new DiscardCallback();
-    }
-
-    /**
-     * Callback for the {@link CompletedCheckpoint} instance to notify about disposal of the
-     * checkpoint (most commonly when the checkpoint has been subsumed by a newer one).
+     * Mark the checkpoint has been discarded.
      */
-    class DiscardCallback {
-
-        /**
-         * Updates the discarded flag of the checkpoint stats.
-         *
-         * <p>After this notification, {@link #isDiscarded()} will return <code>true</code>.
-         */
-        void notifyDiscardedCheckpoint() {
-            discarded = true;
-        }
+    void discard() {
+        discarded = true;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index f9d3a0d..43a2a1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.PendingCheckpointStatsCallback;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
@@ -82,7 +81,6 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
                 totalSubtaskCount,
                 numAcknowledgedSubtasks,
                 taskStats,
-                FAILING_REPORT_CALLBACK,
                 checkpointedSize,
                 stateSize,
                 processedData,
@@ -122,19 +120,4 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
     public String getFailureMessage() {
         return failureMsg;
     }
-
-    private static final PendingCheckpointStatsCallback FAILING_REPORT_CALLBACK =
-            new PendingCheckpointStatsCallback() {
-                @Override
-                public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-                    throw new UnsupportedOperationException(
-                            "Failed checkpoint stats can't be completed");
-                }
-
-                @Override
-                public void reportFailedCheckpoint(FailedCheckpointStats failed) {
-                    throw new UnsupportedOperationException(
-                            "Failed checkpoint stats can't be failed");
-                }
-            };
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 8521722..aee95dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -111,6 +111,8 @@ public class PendingCheckpoint implements Checkpoint {
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
+    @Nullable private final PendingCheckpointStats pendingCheckpointStats;
+
     private int numAcknowledgedTasks;
 
     private boolean disposed;
@@ -132,8 +134,8 @@ public class PendingCheckpoint implements Checkpoint {
             Collection<String> masterStateIdentifiers,
             CheckpointProperties props,
             CheckpointStorageLocation targetLocation,
-            CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
-
+            CompletableFuture<CompletedCheckpoint> onCompletionPromise,
+            @Nullable PendingCheckpointStats pendingCheckpointStats) {
         checkArgument(
                 checkpointPlan.getTasksToWaitFor().size() > 0,
                 "Checkpoint needs at least one vertex that commits the checkpoint");
@@ -163,6 +165,7 @@ public class PendingCheckpoint implements Checkpoint {
                         : new HashSet<>(operatorCoordinatorsToConfirm);
         this.acknowledgedTasks = new HashSet<>(checkpointPlan.getTasksToWaitFor().size());
         this.onCompletionPromise = checkNotNull(onCompletionPromise);
+        this.pendingCheckpointStats = pendingCheckpointStats;
     }
 
     // --------------------------------------------------------------------------------------------
@@ -301,7 +304,7 @@ public class PendingCheckpoint implements Checkpoint {
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
             Executor executor,
-            @Nullable PendingCheckpointStats statsCallback)
+            CheckpointStatsTracker statsTracker)
             throws IOException {
 
         synchronized (lock) {
@@ -334,26 +337,24 @@ public class PendingCheckpoint implements Checkpoint {
                                 operatorStates,
                                 masterStates,
                                 props,
-                                finalizedLocation);
+                                finalizedLocation,
+                                toCompletedCheckpointStats(finalizedLocation));
 
-                onCompletionPromise.complete(completed);
-
-                if (statsCallback != null) {
+                CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
+                if (completedCheckpointStats != null) {
                     LOG.trace(
                             "Checkpoint {} size: {}Kb, duration: {}ms",
                             checkpointId,
-                            statsCallback.getStateSize() == 0
+                            completedCheckpointStats.getStateSize() == 0
                                     ? 0
-                                    : statsCallback.getStateSize() / 1024,
-                            statsCallback.getEndToEndDuration());
-                    // Finalize the statsCallback and give the completed checkpoint a
-                    // callback for discards.
-                    CompletedCheckpointStats.DiscardCallback discardCallback =
-                            statsCallback.reportCompletedCheckpoint(
-                                    finalizedLocation.getExternalPointer());
-                    completed.setDiscardCallback(discardCallback);
+                                    : completedCheckpointStats.getStateSize() / 1024,
+                            completedCheckpointStats.getEndToEndDuration());
+
+                    statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
                 }
 
+                onCompletionPromise.complete(completed);
+
                 // mark this pending checkpoint as disposed, but do NOT drop the state
                 dispose(false, checkpointsCleaner, postCleanup, executor);
 
@@ -366,6 +367,15 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
+    @Nullable
+    private CompletedCheckpointStats toCompletedCheckpointStats(
+            CompletedCheckpointStorageLocation finalizedLocation) {
+        return pendingCheckpointStats != null
+                ? pendingCheckpointStats.toCompletedCheckpointStats(
+                        finalizedLocation.getExternalPointer())
+                : null;
+    }
+
     /**
      * Acknowledges the task with the given execution attempt id and the given subtask state.
      *
@@ -377,8 +387,7 @@ public class PendingCheckpoint implements Checkpoint {
     public TaskAcknowledgeResult acknowledgeTask(
             ExecutionAttemptID executionAttemptId,
             TaskStateSnapshot operatorSubtaskStates,
-            CheckpointMetrics metrics,
-            @Nullable PendingCheckpointStats statsCallback) {
+            CheckpointMetrics metrics) {
 
         synchronized (lock) {
             if (disposed) {
@@ -415,7 +424,7 @@ public class PendingCheckpoint implements Checkpoint {
 
             // publish the checkpoint statistics
             // to prevent null-pointers from concurrent modification, copy reference onto stack
-            if (statsCallback != null) {
+            if (pendingCheckpointStats != null) {
                 // Do this in millis because the web frontend works with them
                 long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
                 long checkpointStartDelayMillis =
@@ -443,10 +452,12 @@ public class PendingCheckpoint implements Checkpoint {
                         subtaskStateStats.getStateSize() == 0
                                 ? 0
                                 : subtaskStateStats.getStateSize() / 1024,
-                        subtaskStateStats.getEndToEndDuration(statsCallback.getTriggerTimestamp()),
+                        subtaskStateStats.getEndToEndDuration(
+                                pendingCheckpointStats.getTriggerTimestamp()),
                         subtaskStateStats.getSyncCheckpointDuration(),
                         subtaskStateStats.getAsyncCheckpointDuration());
-                statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
+                pendingCheckpointStats.reportSubtaskStats(
+                        vertex.getJobvertexId(), subtaskStateStats);
             }
 
             return TaskAcknowledgeResult.SUCCESS;
@@ -541,11 +552,11 @@ public class PendingCheckpoint implements Checkpoint {
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
             Executor executor,
-            PendingCheckpointStats statsCallback) {
+            CheckpointStatsTracker statsTracker) {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(failureCause, statsCallback);
+            reportFailedCheckpoint(statsTracker, failureCause);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -628,11 +639,11 @@ public class PendingCheckpoint implements Checkpoint {
      *
      * @param cause The failure cause or <code>null</code>.
      */
-    private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
+    private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
         // to prevent null-pointers from concurrent modification, copy reference onto stack
-        if (statsCallback != null) {
-            long failureTimestamp = System.currentTimeMillis();
-            statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
+        if (pendingCheckpointStats != null) {
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index a575cf6..a8d74f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.stream.Collectors.toConcurrentMap;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Statistics for a pending checkpoint that is still in progress.
@@ -45,9 +44,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
 
     private static final long serialVersionUID = -973959257699390327L;
 
-    /** Tracker callback when the pending checkpoint is finalized or aborted. */
-    private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
-
     /** The current number of acknowledged subtasks. */
     private volatile int currentNumAcknowledgedSubtasks;
 
@@ -70,14 +66,12 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param triggerTimestamp Timestamp when the checkpoint was triggered.
      * @param props Checkpoint properties of the checkpoint.
      * @param taskStats Task stats for each involved operator.
-     * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
      */
     PendingCheckpointStats(
             long checkpointId,
             long triggerTimestamp,
             CheckpointProperties props,
-            Map<JobVertexID, Integer> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+            Map<JobVertexID, Integer> taskStats) {
         this(
                 checkpointId,
                 triggerTimestamp,
@@ -87,8 +81,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
                         .collect(
                                 toConcurrentMap(
                                         Map.Entry::getKey,
-                                        e -> new TaskStateStats(e.getKey(), e.getValue()))),
-                trackerCallback);
+                                        e -> new TaskStateStats(e.getKey(), e.getValue()))));
     }
 
     /**
@@ -99,15 +92,13 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param props Checkpoint properties of the checkpoint.
      * @param totalSubtaskCount Total number of subtasks for the checkpoint.
      * @param taskStats Task stats for each involved operator.
-     * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
      */
     PendingCheckpointStats(
             long checkpointId,
             long triggerTimestamp,
             CheckpointProperties props,
             int totalSubtaskCount,
-            Map<JobVertexID, TaskStateStats> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+            Map<JobVertexID, TaskStateStats> taskStats) {
         this(
                 checkpointId,
                 triggerTimestamp,
@@ -115,7 +106,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
                 totalSubtaskCount,
                 0,
                 taskStats,
-                trackerCallback,
                 0,
                 0,
                 0,
@@ -130,7 +120,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
             int totalSubtaskCount,
             int acknowledgedSubtaskCount,
             Map<JobVertexID, TaskStateStats> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback,
             long currentCheckpointedSize,
             long currentStateSize,
             long processedData,
@@ -138,7 +127,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
             @Nullable SubtaskStateStats latestAcknowledgedSubtask) {
 
         super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
-        this.trackerCallback = checkNotNull(trackerCallback);
         this.currentCheckpointedSize = currentCheckpointedSize;
         this.currentStateSize = currentStateSize;
         this.currentProcessedData = processedData;
@@ -220,31 +208,20 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
         }
     }
 
-    /**
-     * Reports a successfully completed pending checkpoint.
-     *
-     * @param externalPointer Optional external storage path if checkpoint was externalized.
-     * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal.
-     */
-    CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(String externalPointer) {
-        CompletedCheckpointStats completed =
-                new CompletedCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        numberOfSubtasks,
-                        new HashMap<>(taskStats),
-                        currentNumAcknowledgedSubtasks,
-                        currentCheckpointedSize,
-                        currentStateSize,
-                        currentProcessedData,
-                        currentPersistedData,
-                        latestAcknowledgedSubtask,
-                        externalPointer);
-
-        trackerCallback.reportCompletedCheckpoint(completed);
-
-        return completed.getDiscardCallback();
+    CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) {
+        return new CompletedCheckpointStats(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                numberOfSubtasks,
+                new HashMap<>(taskStats),
+                currentNumAcknowledgedSubtasks,
+                currentCheckpointedSize,
+                currentStateSize,
+                currentProcessedData,
+                currentPersistedData,
+                latestAcknowledgedSubtask,
+                externalPointer);
     }
 
     /**
@@ -253,24 +230,21 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param failureTimestamp Timestamp of the failure.
      * @param cause Optional cause of the failure.
      */
-    void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
-        FailedCheckpointStats failed =
-                new FailedCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        numberOfSubtasks,
-                        new HashMap<>(taskStats),
-                        currentNumAcknowledgedSubtasks,
-                        currentCheckpointedSize,
-                        currentStateSize,
-                        currentProcessedData,
-                        currentPersistedData,
-                        failureTimestamp,
-                        latestAcknowledgedSubtask,
-                        cause);
-
-        trackerCallback.reportFailedCheckpoint(failed);
+    FailedCheckpointStats toFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
+        return new FailedCheckpointStats(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                numberOfSubtasks,
+                new HashMap<>(taskStats),
+                currentNumAcknowledgedSubtasks,
+                currentCheckpointedSize,
+                currentStateSize,
+                currentProcessedData,
+                currentPersistedData,
+                failureTimestamp,
+                latestAcknowledgedSubtask,
+                cause);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 3dfebab..ce90895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -296,7 +296,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                         masterHookStates,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -356,7 +357,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                         masterHookStates,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 417473e..3b83973 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -773,7 +773,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         // set up the coordinator and validate the initial state
         SharedStateRegistry sharedStateRegistry =
@@ -1089,7 +1090,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 completedCheckpoint, new CheckpointsCleaner(), () -> {});
 
@@ -1131,7 +1133,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 completedCheckpoint, new CheckpointsCleaner(), () -> {});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9297c54..2cfa183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2821,7 +2821,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation()),
+                        new TestCompletedCheckpointStorageLocation(),
+                        null),
                 new CheckpointsCleaner(),
                 () -> {});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 44696cb..6239892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -239,7 +239,8 @@ public class CheckpointStateRestoreTest {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
@@ -269,7 +270,8 @@ public class CheckpointStateRestoreTest {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index c11e572..8e7b0bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -69,7 +69,7 @@ public class CheckpointStatsTrackerTest {
         pending.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         pending.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        pending.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
 
         CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
         // History should be empty
@@ -118,7 +118,7 @@ public class CheckpointStatsTrackerTest {
         completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        completed1.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null));
 
         // Failed checkpoint
         PendingCheckpointStats failed =
@@ -129,7 +129,7 @@ public class CheckpointStatsTrackerTest {
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         vertexToDop);
 
-        failed.reportFailedCheckpoint(12, null);
+        tracker.reportFailedCheckpoint(failed.toFailedCheckpoint(12, null));
 
         // Completed savepoint
         PendingCheckpointStats savepoint =
@@ -143,7 +143,7 @@ public class CheckpointStatsTrackerTest {
         savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        savepoint.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null));
 
         // In Progress
         PendingCheckpointStats inProgress =
@@ -242,7 +242,7 @@ public class CheckpointStatsTrackerTest {
         assertEquals(snapshot2, tracker.createSnapshot());
 
         // Complete checkpoint => new snapshot
-        pending.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
 
         CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot();
         assertNotEquals(snapshot2, snapshot3);
@@ -422,7 +422,7 @@ public class CheckpointStatsTrackerTest {
 
         assertTrue(pending.reportSubtaskStats(jobVertexID, subtaskStats));
 
-        pending.reportCompletedCheckpoint(externalPath);
+        stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
 
         // Verify completed checkpoint updated
         assertEquals(Long.valueOf(1), numCheckpoints.getValue());
@@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest {
                         singletonMap(jobVertexID, 1));
 
         long failureTimestamp = 1230123L;
-        nextPending.reportFailedCheckpoint(failureTimestamp, null);
+        stats.reportFailedCheckpoint(nextPending.toFailedCheckpoint(failureTimestamp, null));
 
         // Verify updated
         assertEquals(Long.valueOf(2), numCheckpoints.getValue());
@@ -482,7 +482,7 @@ public class CheckpointStatsTrackerTest {
                         singletonMap(jobVertexID, 1));
 
         thirdPending.reportSubtaskStats(jobVertexID, subtaskStats);
-        thirdPending.reportCompletedCheckpoint(null);
+        stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null));
 
         // Verify external path is "n/a", because internal checkpoint won't generate external path.
         assertEquals("n/a", latestCompletedExternalPath.getValue());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 122773a..62e5479 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -307,7 +307,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
                     operatorGroupState,
                     null,
                     props,
-                    new TestCompletedCheckpointStorageLocation());
+                    new TestCompletedCheckpointStorageLocation(),
+                    null);
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index c7cad82..3a29452 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -63,7 +63,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -75,7 +76,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -103,7 +105,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -115,7 +118,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -145,7 +149,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -157,7 +162,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -184,7 +190,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -196,7 +203,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -223,7 +231,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -254,7 +263,8 @@ public class CompletedCheckpointTest {
                         operatorStates,
                         Collections.emptyList(),
                         props,
-                        location);
+                        location,
+                        null);
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -306,7 +316,8 @@ public class CompletedCheckpointTest {
                             new HashMap<>(operatorStates),
                             Collections.emptyList(),
                             retainProps,
-                            retainedLocation);
+                            retainedLocation,
+                            null);
 
             checkpoint.discardOnShutdown(status);
 
@@ -333,7 +344,8 @@ public class CompletedCheckpointTest {
                             new HashMap<>(operatorStates),
                             Collections.emptyList(),
                             discardProps,
-                            discardLocation);
+                            discardLocation,
+                            null);
 
             checkpoint.discardOnShutdown(status);
 
@@ -346,6 +358,23 @@ public class CompletedCheckpointTest {
     /** Tests that the stats callbacks happen if the callback is registered. */
     @Test
     public void testCompletedCheckpointStatsCallbacks() throws Exception {
+        Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+        JobVertexID jobVertexId = new JobVertexID();
+        taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+        CompletedCheckpointStats checkpointStats =
+                new CompletedCheckpointStats(
+                        1,
+                        0,
+                        CheckpointProperties.forCheckpoint(
+                                CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                        1,
+                        taskStats,
+                        1,
+                        1,
+                        1,
+                        1,
+                        mock(SubtaskStateStats.class),
+                        null);
         CompletedCheckpoint completed =
                 new CompletedCheckpoint(
                         new JobID(),
@@ -356,14 +385,11 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
-
-        CompletedCheckpointStats.DiscardCallback callback =
-                mock(CompletedCheckpointStats.DiscardCallback.class);
-        completed.setDiscardCallback(callback);
+                        new TestCompletedCheckpointStorageLocation(),
+                        checkpointStats);
 
         completed.discardOnShutdown(JobStatus.FINISHED);
-        verify(callback, times(1)).notifyDiscardedCheckpoint();
+        assertTrue(checkpointStats.isDiscarded());
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index ed36db9..064e26a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -418,7 +418,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
                 Collections.emptyMap(),
                 Collections.emptyList(),
                 props,
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
index d7abd6e..82db6ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
@@ -58,7 +58,8 @@ public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger {
                 new HashMap<>(),
                 Collections.emptyList(),
                 CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private static class FailingRetrievableStateHandle<T extends Serializable>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 37680ba..e46f608 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -52,17 +52,9 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
-
         PendingCheckpointStats pending =
                 new PendingCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        totalSubtaskCount,
-                        taskStats,
-                        callback);
+                        checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
 
         // Check initial state
         assertEquals(checkpointId, pending.getCheckpointId());
@@ -129,8 +121,7 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+        CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
 
         PendingCheckpointStats pending =
                 new PendingCheckpointStats(
@@ -139,8 +130,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
-                        taskStats,
-                        callback);
+                        taskStats);
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -154,8 +144,7 @@ public class PendingCheckpointStatsTest {
         // Report completed
         String externalPath = "asdjkasdjkasd";
 
-        CompletedCheckpointStats.DiscardCallback discardCallback =
-                pending.reportCompletedCheckpoint(externalPath);
+        callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
 
         ArgumentCaptor<CompletedCheckpointStats> args =
                 ArgumentCaptor.forClass(CompletedCheckpointStats.class);
@@ -166,7 +155,7 @@ public class PendingCheckpointStatsTest {
         assertNotNull(completed);
         assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
         assertFalse(completed.isDiscarded());
-        discardCallback.notifyDiscardedCheckpoint();
+        completed.discard();
         assertTrue(completed.isDiscarded());
         assertEquals(externalPath, completed.getExternalPath());
 
@@ -194,8 +183,7 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+        CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
 
         long triggerTimestamp = 123123;
         PendingCheckpointStats pending =
@@ -205,8 +193,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
-                        taskStats,
-                        callback);
+                        taskStats);
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -220,7 +207,7 @@ public class PendingCheckpointStatsTest {
         // Report failed
         Exception cause = new Exception("test exception");
         long failureTimestamp = 112211137;
-        pending.reportFailedCheckpoint(failureTimestamp, cause);
+        callback.reportFailedCheckpoint(pending.toFailedCheckpoint(failureTimestamp, cause));
 
         ArgumentCaptor<FailedCheckpointStats> args =
                 ArgumentCaptor.forClass(FailedCheckpointStats.class);
@@ -263,8 +250,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         1337,
-                        taskStats,
-                        mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+                        taskStats);
 
         PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 05111d6..294c6fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
@@ -74,9 +73,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -218,7 +215,7 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
         pending.finalizeCheckpoint(
                 new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
@@ -292,75 +289,6 @@ public class PendingCheckpointTest {
         verify(state, times(1)).discardState();
     }
 
-    /** Tests that the stats callbacks happen if the callback is registered. */
-    @Test
-    public void testPendingCheckpointStatsCallbacks() throws Exception {
-        {
-            // Complete successfully
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
-            verify(callback, times(1))
-                    .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
-
-            pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
-            verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-    }
-
     /**
      * FLINK-5985.
      *
@@ -373,7 +301,7 @@ public class PendingCheckpointTest {
                 createPendingCheckpoint(
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
         final OperatorState expectedState =
                 new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM);
         Assert.assertEquals(
@@ -394,7 +322,7 @@ public class PendingCheckpointTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
         pending.acknowledgeTask(
-                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
+                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
         Assert.assertFalse(pending.getOperatorStates().isEmpty());
     }
 
@@ -441,7 +369,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -494,7 +422,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -580,8 +508,7 @@ public class PendingCheckpointTest {
         checkpoint.acknowledgeTask(
                 ACK_TASKS.get(0).getAttemptId(),
                 TaskStateSnapshot.FINISHED_ON_RESTORE,
-                new CheckpointMetrics(),
-                null);
+                new CheckpointMetrics());
         assertThat(
                 recordCheckpointPlan.getReportedFinishedOnRestoreTasks(),
                 contains(ACK_TASKS.get(0).getVertex()));
@@ -595,8 +522,7 @@ public class PendingCheckpointTest {
         checkpoint.acknowledgeTask(
                 ACK_TASKS.get(0).getAttemptId(),
                 new TaskStateSnapshot(10, true),
-                new CheckpointMetrics(),
-                null);
+                new CheckpointMetrics());
         assertThat(
                 recordCheckpointPlan.getReportedOperatorsFinishedTasks(),
                 contains(ACK_TASKS.get(0).getVertex()));
@@ -648,7 +574,7 @@ public class PendingCheckpointTest {
                         Collections.emptyList(),
                         Executors.directExecutor());
 
-        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         return checkpoint;
     }
 
@@ -718,7 +644,8 @@ public class PendingCheckpointTest {
                 masterStateIdentifiers,
                 props,
                 location,
-                new CompletableFuture<>());
+                new CompletableFuture<>(),
+                null);
     }
 
     @SuppressWarnings("unchecked")
@@ -741,12 +668,7 @@ public class PendingCheckpointTest {
             CheckpointFailureReason reason,
             PendingCheckpointStats statsCallback) {
         checkpoint.abort(
-                reason,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor(),
-                statsCallback);
+                reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
     }
 
     private static final class QueueExecutor implements Executor {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index d36b321..9a584c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -107,7 +107,8 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
                             Collections.emptyMap(),
                             Collections.emptyList(),
                             CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                            new TestCompletedCheckpointStorageLocation()) {
+                            new TestCompletedCheckpointStorageLocation(),
+                            null) {
                         @Override
                         public boolean discardOnSubsume() {
                             discardAttempted.countDown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index ee4f46c..804d285 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -254,7 +254,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
                             Collections.emptyMap(),
                             Collections.emptyList(),
                             CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                            new TestCompletedCheckpointStorageLocation());
+                            new TestCompletedCheckpointStorageLocation(),
+                            null);
             // shouldn't fail despite the exception
             store.addCheckpointAndSubsumeOldestOne(
                     checkpointToAdd,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 50bac5c..50a9700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -880,7 +880,8 @@ public class JobMasterTest extends TestLogger {
                         null,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new DummyCheckpointStorageLocation());
+                        new DummyCheckpointStorageLocation(),
+                        null);
 
         final StandaloneCompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 641a918..dfc3b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -150,7 +150,8 @@ public class SchedulerUtilsTest extends TestLogger {
                 singletonMap(operatorID, operatorState),
                 emptyList(),
                 CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private IncrementalRemoteKeyedStateHandle buildIncrementalHandle(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
index f6db13c..cab4abe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -223,6 +223,7 @@ public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
                 new HashMap<>(),
                 null,
                 CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL),
-                new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"));
+                new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"),
+                null);
     }
 }