You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:20 UTC

[flink] branch master updated (f57a537 -> fbfdb0e)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f57a537  [FLINK-25833][core] FlinkUserCodeClassLoader is registered as parallel capable
     new 1e7d45d  [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future
     new dc419b5  [FLINK-25958][runtime] Mark CompletedCheckpoint as discarded before it will be really discarded in order to avoid synchronization for changing discarded flag
     new 906f324  [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store
     new fbfdb0e  [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/checkpoint/Checkpoint.java       |  14 ++-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  77 +++++++++---
 .../runtime/checkpoint/CheckpointStatsTracker.java |  43 +------
 .../flink/runtime/checkpoint/Checkpoints.java      |   3 +-
 .../runtime/checkpoint/CheckpointsCleaner.java     |  20 +--
 .../runtime/checkpoint/CompletedCheckpoint.java    | 138 ++++++++++-----------
 .../checkpoint/CompletedCheckpointStats.java       |  26 +---
 .../EmbeddedCompletedCheckpointStore.java          |   4 +-
 .../runtime/checkpoint/FailedCheckpointStats.java  |  17 ---
 .../runtime/checkpoint/PendingCheckpoint.java      | 132 +++++++++++---------
 .../runtime/checkpoint/PendingCheckpointStats.java |  90 +++++---------
 .../StandaloneCompletedCheckpointStore.java        |  10 +-
 .../CheckpointCoordinatorFailureTest.java          |  31 ++++-
 .../CheckpointCoordinatorMasterHooksTest.java      |   6 +-
 .../CheckpointCoordinatorRestoringTest.java        |   9 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |  34 ++++-
 .../checkpoint/CheckpointStateRestoreTest.java     |   6 +-
 .../checkpoint/CheckpointStatsTrackerTest.java     |  16 +--
 .../checkpoint/CompletedCheckpointStoreTest.java   |  49 +++-----
 .../checkpoint/CompletedCheckpointTest.java        |  70 +++++++----
 .../DefaultCompletedCheckpointStoreTest.java       |   3 +-
 .../DefaultCompletedCheckpointStoreUtilsTest.java  |   3 +-
 .../checkpoint/PendingCheckpointStatsTest.java     |  32 ++---
 .../runtime/checkpoint/PendingCheckpointTest.java  | 105 ++--------------
 .../StandaloneCompletedCheckpointStoreTest.java    |   5 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   3 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |   3 +-
 ...topWithSavepointTerminationHandlerImplTest.java |   3 +-
 29 files changed, 450 insertions(+), 505 deletions(-)

[flink] 02/04: [FLINK-25958][runtime] Mark CompletedCheckpoint as discarded before it will be really discarded in order to avoid synchronization for changing discarded flag

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc419b5639f68bcb0b773763f24179dd3536d713
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:45:26 2022 +0100

    [FLINK-25958][runtime] Mark CompletedCheckpoint as discarded before it will be really discarded in order to avoid synchronization for changing discarded flag
---
 .../flink/runtime/checkpoint/Checkpoint.java       |  14 ++-
 .../runtime/checkpoint/CheckpointsCleaner.java     |  20 +---
 .../runtime/checkpoint/CompletedCheckpoint.java    | 125 ++++++++++-----------
 .../checkpoint/CompletedCheckpointStats.java       |   4 +-
 .../EmbeddedCompletedCheckpointStore.java          |   4 +-
 .../runtime/checkpoint/PendingCheckpoint.java      |  69 +++++++-----
 .../StandaloneCompletedCheckpointStore.java        |  10 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |  46 +++-----
 .../checkpoint/CompletedCheckpointTest.java        |   8 +-
 .../StandaloneCompletedCheckpointStoreTest.java    |   2 +-
 10 files changed, 153 insertions(+), 149 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
index 4e654ba..b897287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java
@@ -19,7 +19,19 @@ package org.apache.flink.runtime.checkpoint;
 
 /** A checkpoint, pending or completed. */
 public interface Checkpoint {
+    DiscardObject NOOP_DISCARD_OBJECT = () -> {};
+
     long getCheckpointID();
 
-    void discard() throws Exception;
+    /**
+     * This method precede the {@link DiscardObject#discard()} method and should be called from the
+     * {@link CheckpointCoordinator}(under the lock) while {@link DiscardObject#discard()} can be
+     * called from any thread/place.
+     */
+    DiscardObject markAsDiscarded();
+
+    /** Extra interface for discarding the checkpoint. */
+    interface DiscardObject {
+        void discard() throws Exception;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
index 506cb6d..08161dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
@@ -59,24 +59,16 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
             boolean shouldDiscard,
             Runnable postCleanAction,
             Executor executor) {
-        cleanup(
-                checkpoint,
-                () -> {
-                    if (shouldDiscard) {
-                        checkpoint.discard();
-                    }
-                },
-                postCleanAction,
-                executor);
+        Checkpoint.DiscardObject discardObject =
+                shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT;
+
+        cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
     public void cleanCheckpointOnFailedStoring(
             CompletedCheckpoint completedCheckpoint, Executor executor) {
-        cleanup(
-                completedCheckpoint,
-                completedCheckpoint::discardOnFailedStoring,
-                () -> {},
-                executor);
+        Checkpoint.DiscardObject discardObject = completedCheckpoint.markAsDiscarded();
+        cleanup(completedCheckpoint, discardObject::discard, () -> {}, executor);
     }
 
     private void cleanup(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 841e543..41c5774 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -43,6 +44,7 @@ import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their
@@ -66,6 +68,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * metadata file. For a state backend that stores metadata in database tables, the pointer could be
  * the table name and row key. The pointer is encoded as a String.
  */
+@NotThreadSafe
 public class CompletedCheckpoint implements Serializable, Checkpoint {
 
     private static final Logger LOG = LoggerFactory.getLogger(CompletedCheckpoint.class);
@@ -216,77 +219,20 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
     //  Discard and Dispose
     // ------------------------------------------------------------------------
 
-    public void discardOnFailedStoring() throws Exception {
-        discard();
-    }
-
-    public boolean discardOnSubsume() throws Exception {
-        if (shouldBeDiscardedOnSubsume()) {
-            discard();
-            return true;
+    public DiscardObject markAsDiscarded() {
+        if (completedCheckpointStats != null) {
+            completedCheckpointStats.discard();
         }
 
-        return false;
-    }
-
-    public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
-
-        if (shouldBeDiscardedOnShutdown(jobStatus)) {
-
-            discard();
-            return true;
-        } else {
-            LOG.info("Checkpoint with ID {} at '{}' not discarded.", checkpointID, externalPointer);
-            return false;
-        }
+        return new CompletedCheckpointDiscardObject();
     }
 
-    @Override
-    public void discard() throws Exception {
-        LOG.trace("Executing discard procedure for {}.", this);
-
-        try {
-            // collect exceptions and continue cleanup
-            Exception exception = null;
-
-            // drop the metadata
-            try {
-                metadataHandle.discardState();
-            } catch (Exception e) {
-                exception = e;
-            }
-
-            // discard private state objects
-            try {
-                StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-
-            // discard location as a whole
-            try {
-                storageLocation.disposeStorageLocation();
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-
-            if (exception != null) {
-                throw exception;
-            }
-        } finally {
-            operatorStates.clear();
-
-            if (completedCheckpointStats != null) {
-                completedCheckpointStats.discard();
-            }
-        }
+    public DiscardObject markAsDiscardedOnSubsume() {
+        return shouldBeDiscardedOnSubsume() ? markAsDiscarded() : NOOP_DISCARD_OBJECT;
     }
 
-    /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */
-    public void markAsDiscarded() {
-        if (completedCheckpointStats != null) {
-            completedCheckpointStats.discard();
-        }
+    public DiscardObject markAsDiscardedOnShutdown(JobStatus jobStatus) {
+        return shouldBeDiscardedOnShutdown(jobStatus) ? markAsDiscarded() : NOOP_DISCARD_OBJECT;
     }
 
     public boolean shouldBeDiscardedOnSubsume() {
@@ -338,4 +284,53 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
                 "%s %d @ %d for %s located at %s",
                 props.getCheckpointType().getName(), checkpointID, timestamp, job, externalPointer);
     }
+
+    /** Implementation of {@link org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject}. */
+    @NotThreadSafe
+    public class CompletedCheckpointDiscardObject implements DiscardObject {
+
+        @Override
+        public void discard() throws Exception {
+            LOG.trace("Executing discard procedure for {}.", this);
+            checkState(
+                    isMarkedAsDiscarded(),
+                    "Checkpoint should be marked as discarded before discard.");
+
+            try {
+                // collect exceptions and continue cleanup
+                Exception exception = null;
+
+                // drop the metadata
+                try {
+                    metadataHandle.discardState();
+                } catch (Exception e) {
+                    exception = e;
+                }
+
+                // discard private state objects
+                try {
+                    StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                } catch (Exception e) {
+                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
+                }
+
+                // discard location as a whole
+                try {
+                    storageLocation.disposeStorageLocation();
+                } catch (Exception e) {
+                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
+                }
+
+                if (exception != null) {
+                    throw exception;
+                }
+            } finally {
+                operatorStates.clear();
+            }
+        }
+
+        private boolean isMarkedAsDiscarded() {
+            return completedCheckpointStats == null || completedCheckpointStats.isDiscarded();
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 88339ac..06cc8a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -182,9 +182,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
         return discarded;
     }
 
-    /**
-     * Mark the checkpoint has been discarded.
-     */
+    /** Mark the checkpoint has been discarded. */
     void discard() {
         discarded = true;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 653ecf1..1e5e47c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -90,7 +90,7 @@ public class EmbeddedCompletedCheckpointStore extends AbstractCompleteCheckpoint
                 CheckpointSubsumeHelper.subsume(
                                 checkpoints,
                                 maxRetainedCheckpoints,
-                                CompletedCheckpoint::discardOnSubsume)
+                                cc -> cc.markAsDiscardedOnSubsume().discard())
                         .orElse(null);
 
         unregisterUnusedState(checkpoints);
@@ -101,7 +101,7 @@ public class EmbeddedCompletedCheckpointStore extends AbstractCompleteCheckpoint
     @VisibleForTesting
     void removeOldestCheckpoint() throws Exception {
         CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-        checkpointToSubsume.discardOnSubsume();
+        checkpointToSubsume.markAsDiscardedOnSubsume().discard();
         unregisterUnusedState(checkpoints);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index aee95dd..76c6a44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the state handles
  * always as serialized values, never as actual values.
  */
+@NotThreadSafe
 public class PendingCheckpoint implements Checkpoint {
 
     /** Result of the {@link PendingCheckpoint#acknowledgedTasks} method. */
@@ -591,35 +593,9 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    /**
-     * Discard state. Must be called after {@link #dispose(boolean, CheckpointsCleaner, Runnable,
-     * Executor) dispose}.
-     */
     @Override
-    public void discard() {
-        synchronized (lock) {
-            if (discarded) {
-                Preconditions.checkState(
-                        disposed, "Checkpoint should be disposed before being discarded");
-                return;
-            } else {
-                discarded = true;
-            }
-        }
-        // discard the private states.
-        // unregistered shared states are still considered private at this point.
-        try {
-            StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-            targetLocation.disposeOnFailure();
-        } catch (Throwable t) {
-            LOG.warn(
-                    "Could not properly dispose the private states in the pending checkpoint {} of job {}.",
-                    checkpointId,
-                    jobId,
-                    t);
-        } finally {
-            operatorStates.clear();
-        }
+    public DiscardObject markAsDiscarded() {
+        return new PendingCheckpointDiscardObject();
     }
 
     private void cancelCanceller() {
@@ -660,4 +636,41 @@ public class PendingCheckpoint implements Checkpoint {
                 getNumberOfAcknowledgedTasks(),
                 getNumberOfNonAcknowledgedTasks());
     }
+
+    /**
+     * Implementation of {@link org.apache.flink.runtime.checkpoint.Checkpoint.DiscardObject} for
+     * {@link PendingCheckpoint}.
+     */
+    public class PendingCheckpointDiscardObject implements DiscardObject {
+        /**
+         * Discard state. Must be called after {@link #dispose(boolean, CheckpointsCleaner,
+         * Runnable, Executor) dispose}.
+         */
+        @Override
+        public void discard() {
+            synchronized (lock) {
+                if (discarded) {
+                    Preconditions.checkState(
+                            disposed, "Checkpoint should be disposed before being discarded");
+                    return;
+                } else {
+                    discarded = true;
+                }
+            }
+            // discard the private states.
+            // unregistered shared states are still considered private at this point.
+            try {
+                StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                targetLocation.disposeOnFailure();
+            } catch (Throwable t) {
+                LOG.warn(
+                        "Could not properly dispose the private states in the pending checkpoint {} of job {}.",
+                        checkpointId,
+                        jobId,
+                        t);
+            } finally {
+                operatorStates.clear();
+            }
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 9a1d1095..87a6486 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -101,7 +101,7 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompleteCheckpoi
                 CheckpointSubsumeHelper.subsume(
                                 checkpoints,
                                 maxNumberOfCheckpointsToRetain,
-                                CompletedCheckpoint::discardOnSubsume)
+                                cc -> cc.markAsDiscardedOnSubsume().discard())
                         .orElse(null);
 
         unregisterUnusedState(checkpoints);
@@ -133,7 +133,13 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompleteCheckpoi
 
             long lowestRetained = Long.MAX_VALUE;
             for (CompletedCheckpoint checkpoint : checkpoints) {
-                if (!checkpoint.discardOnShutdown(jobStatus)) {
+                if (checkpoint.shouldBeDiscardedOnShutdown(jobStatus)) {
+                    checkpoint.markAsDiscardedOnShutdown(jobStatus).discard();
+                } else {
+                    LOG.info(
+                            "Checkpoint with ID {} at '{}' not discarded.",
+                            checkpoint.getCheckpointID(),
+                            checkpoint.getExternalPointer());
                     lowestRetained = Math.min(checkpoint.getCheckpointID(), lowestRetained);
                 }
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 62e5479..39891d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -312,35 +312,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
         }
 
         @Override
-        public boolean discardOnSubsume() throws Exception {
-            if (super.discardOnSubsume()) {
-                discard();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
-            if (super.discardOnShutdown(jobStatus)) {
-                discard();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public void discard() throws Exception {
-            super.discard();
-            if (!isDiscarded) {
-                this.isDiscarded = true;
-
-                if (discardLatch != null) {
-                    discardLatch.countDown();
-                }
-            }
+        public CompletedCheckpointDiscardObject markAsDiscarded() {
+            return new TestCompletedCheckpointDiscardObject();
         }
 
         public boolean isDiscarded() {
@@ -376,6 +349,21 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
         public int hashCode() {
             return getJobId().hashCode() + (int) getCheckpointID();
         }
+
+        /** */
+        public class TestCompletedCheckpointDiscardObject extends CompletedCheckpointDiscardObject {
+            @Override
+            public void discard() throws Exception {
+                super.discard();
+                if (!isDiscarded) {
+                    isDiscarded = true;
+
+                    if (discardLatch != null) {
+                        discardLatch.countDown();
+                    }
+                }
+            }
+        }
     }
 
     public static class TestOperatorSubtaskState extends OperatorSubtaskState {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 3a29452..b6461a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -271,7 +271,7 @@ public class CompletedCheckpointTest {
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
 
         // Subsume
-        checkpoint.discardOnSubsume();
+        checkpoint.markAsDiscardedOnSubsume().discard();
 
         verify(state, times(1)).discardState();
         assertTrue(location.isDisposed());
@@ -319,7 +319,7 @@ public class CompletedCheckpointTest {
                             retainedLocation,
                             null);
 
-            checkpoint.discardOnShutdown(status);
+            checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(0)).discardState();
             assertFalse(retainedLocation.isDisposed());
@@ -347,7 +347,7 @@ public class CompletedCheckpointTest {
                             discardLocation,
                             null);
 
-            checkpoint.discardOnShutdown(status);
+            checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(1)).discardState();
             assertTrue(discardLocation.isDisposed());
@@ -388,7 +388,7 @@ public class CompletedCheckpointTest {
                         new TestCompletedCheckpointStorageLocation(),
                         checkpointStats);
 
-        completed.discardOnShutdown(JobStatus.FINISHED);
+        completed.markAsDiscardedOnShutdown(JobStatus.FINISHED).discard();
         assertTrue(checkpointStats.isDiscarded());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 9a584c9..f30df4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -110,7 +110,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
                             new TestCompletedCheckpointStorageLocation(),
                             null) {
                         @Override
-                        public boolean discardOnSubsume() {
+                        public CompletedCheckpointDiscardObject markAsDiscardedOnSubsume() {
                             discardAttempted.countDown();
                             throw new RuntimeException();
                         }

[flink] 01/04: [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e7d45d53b7ea7b9cfadf2e293ba790f3a9e90c3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:41:20 2022 +0100

    [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  35 ++++----
 .../runtime/checkpoint/CheckpointStatsTracker.java |  43 +--------
 .../flink/runtime/checkpoint/Checkpoints.java      |   3 +-
 .../runtime/checkpoint/CompletedCheckpoint.java    |  31 ++++---
 .../checkpoint/CompletedCheckpointStats.java       |  24 +----
 .../runtime/checkpoint/FailedCheckpointStats.java  |  17 ----
 .../runtime/checkpoint/PendingCheckpoint.java      |  65 ++++++++------
 .../runtime/checkpoint/PendingCheckpointStats.java |  90 +++++++------------
 .../CheckpointCoordinatorMasterHooksTest.java      |   6 +-
 .../CheckpointCoordinatorRestoringTest.java        |   9 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |   6 +-
 .../checkpoint/CheckpointStatsTrackerTest.java     |  16 ++--
 .../checkpoint/CompletedCheckpointStoreTest.java   |   3 +-
 .../checkpoint/CompletedCheckpointTest.java        |  62 +++++++++----
 .../DefaultCompletedCheckpointStoreTest.java       |   3 +-
 .../DefaultCompletedCheckpointStoreUtilsTest.java  |   3 +-
 .../checkpoint/PendingCheckpointStatsTest.java     |  32 ++-----
 .../runtime/checkpoint/PendingCheckpointTest.java  | 100 +++------------------
 .../StandaloneCompletedCheckpointStoreTest.java    |   3 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   3 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |   3 +-
 ...topWithSavepointTerminationHandlerImplTest.java |   3 +-
 24 files changed, 218 insertions(+), 348 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 335ea9a..5248538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -779,6 +779,9 @@ public class CheckpointCoordinator {
             }
         }
 
+        PendingCheckpointStats pendingCheckpointStats =
+                trackPendingCheckpointStats(checkpointID, checkpointPlan, props, timestamp);
+
         final PendingCheckpoint checkpoint =
                 new PendingCheckpoint(
                         job,
@@ -789,9 +792,8 @@ public class CheckpointCoordinator {
                         masterHooks.keySet(),
                         props,
                         checkpointStorageLocation,
-                        onCompletionPromise);
-
-        trackPendingCheckpointStats(checkpoint);
+                        onCompletionPromise,
+                        pendingCheckpointStats);
 
         synchronized (lock) {
             pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1122,8 +1124,7 @@ public class CheckpointCoordinator {
                 switch (checkpoint.acknowledgeTask(
                         message.getTaskExecutionId(),
                         message.getSubtaskState(),
-                        message.getCheckpointMetrics(),
-                        getStatsCallback(checkpoint))) {
+                        message.getCheckpointMetrics())) {
                     case SUCCESS:
                         LOG.debug(
                                 "Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1313,7 +1314,7 @@ public class CheckpointCoordinator {
                             checkpointsCleaner,
                             this::scheduleTriggerRequest,
                             executor,
-                            getStatsCallback(pendingCheckpoint));
+                            statsTracker);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
@@ -2041,7 +2042,7 @@ public class CheckpointCoordinator {
                         checkpointsCleaner,
                         this::scheduleTriggerRequest,
                         executor,
-                        getStatsCallback(pendingCheckpoint));
+                        statsTracker);
 
                 failureManager.handleCheckpointException(
                         pendingCheckpoint,
@@ -2180,11 +2181,15 @@ public class CheckpointCoordinator {
         SKIP;
     }
 
-    private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
+    private PendingCheckpointStats trackPendingCheckpointStats(
+            long checkpointId,
+            CheckpointPlan checkpointPlan,
+            CheckpointProperties props,
+            long checkpointTimestamp) {
         Map<JobVertexID, Integer> vertices =
                 Stream.concat(
-                                checkpoint.getCheckpointPlan().getTasksToWaitFor().stream(),
-                                checkpoint.getCheckpointPlan().getFinishedTasks().stream())
+                                checkpointPlan.getTasksToWaitFor().stream(),
+                                checkpointPlan.getFinishedTasks().stream())
                         .map(Execution::getVertex)
                         .map(ExecutionVertex::getJobVertex)
                         .distinct()
@@ -2195,13 +2200,11 @@ public class CheckpointCoordinator {
 
         PendingCheckpointStats pendingCheckpointStats =
                 statsTracker.reportPendingCheckpoint(
-                        checkpoint.getCheckpointID(),
-                        checkpoint.getCheckpointTimestamp(),
-                        checkpoint.getProps(),
-                        vertices);
+                        checkpointId, checkpointTimestamp, props, vertices);
+
+        reportFinishedTasks(pendingCheckpointStats, checkpointPlan.getFinishedTasks());
 
-        reportFinishedTasks(
-                pendingCheckpointStats, checkpoint.getCheckpointPlan().getFinishedTasks());
+        return pendingCheckpointStats;
     }
 
     private void reportFinishedTasks(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index c691614..f10a668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -160,12 +160,7 @@ public class CheckpointStatsTracker {
             Map<JobVertexID, Integer> vertexToDop) {
 
         PendingCheckpointStats pending =
-                new PendingCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        vertexToDop,
-                        PendingCheckpointStatsCallback.proxyFor(this));
+                new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop);
 
         statsReadWriteLock.lock();
         try {
@@ -204,7 +199,7 @@ public class CheckpointStatsTracker {
      *
      * @param completed The completed checkpoint stats.
      */
-    private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+    void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
         statsReadWriteLock.lock();
         try {
             latestCompletedCheckpoint = completed;
@@ -225,7 +220,7 @@ public class CheckpointStatsTracker {
      *
      * @param failed The failed checkpoint stats.
      */
-    private void reportFailedCheckpoint(FailedCheckpointStats failed) {
+    void reportFailedCheckpoint(FailedCheckpointStats failed) {
         statsReadWriteLock.lock();
         try {
             counts.incrementFailedCheckpoints();
@@ -276,38 +271,6 @@ public class CheckpointStatsTracker {
         }
     }
 
-    /** Callback for finalization of a pending checkpoint. */
-    interface PendingCheckpointStatsCallback {
-        /**
-         * Report a completed checkpoint.
-         *
-         * @param completed The completed checkpoint.
-         */
-        void reportCompletedCheckpoint(CompletedCheckpointStats completed);
-
-        /**
-         * Report a failed checkpoint.
-         *
-         * @param failed The failed checkpoint.
-         */
-        void reportFailedCheckpoint(FailedCheckpointStats failed);
-
-        static PendingCheckpointStatsCallback proxyFor(
-                CheckpointStatsTracker checkpointStatsTracker) {
-            return new PendingCheckpointStatsCallback() {
-                @Override
-                public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-                    checkpointStatsTracker.reportCompletedCheckpoint(completed);
-                }
-
-                @Override
-                public void reportFailedCheckpoint(FailedCheckpointStats failed) {
-                    checkpointStatsTracker.reportFailedCheckpoint(failed);
-                }
-            };
-        }
-    }
-
     // ------------------------------------------------------------------------
     // Metrics
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 7d27d50..cc97510 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -215,7 +215,8 @@ public class Checkpoints {
                 checkpointProperties,
                 restoreMode == RestoreMode.CLAIM
                         ? new ClaimModeCompletedStorageLocation(location)
-                        : location);
+                        : location,
+                null);
     }
 
     private static void throwNonRestoredStateException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1972427..841e543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -104,8 +104,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
     /** External pointer to the completed checkpoint (for example file path). */
     private final String externalPointer;
 
-    /** Optional stats tracker callback for discard. */
-    @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
+    /** Completed statistic for managing discard marker. */
+    @Nullable private final transient CompletedCheckpointStats completedCheckpointStats;
 
     // ------------------------------------------------------------------------
 
@@ -117,7 +117,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
             Map<OperatorID, OperatorState> operatorStates,
             @Nullable Collection<MasterState> masterHookStates,
             CheckpointProperties props,
-            CompletedCheckpointStorageLocation storageLocation) {
+            CompletedCheckpointStorageLocation storageLocation,
+            @Nullable CompletedCheckpointStats completedCheckpointStats) {
 
         checkArgument(checkpointID >= 0);
         checkArgument(timestamp >= 0);
@@ -140,6 +141,7 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         this.storageLocation = checkNotNull(storageLocation);
         this.metadataHandle = storageLocation.getMetadataHandle();
         this.externalPointer = storageLocation.getExternalPointer();
+        this.completedCheckpointStats = completedCheckpointStats;
     }
 
     // ------------------------------------------------------------------------
@@ -274,14 +276,19 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         } finally {
             operatorStates.clear();
 
-            // to be null-pointer safe, copy reference to stack
-            CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
-            if (discardCallback != null) {
-                discardCallback.notifyDiscardedCheckpoint();
+            if (completedCheckpointStats != null) {
+                completedCheckpointStats.discard();
             }
         }
     }
 
+    /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */
+    public void markAsDiscarded() {
+        if (completedCheckpointStats != null) {
+            completedCheckpointStats.discard();
+        }
+    }
+
     public boolean shouldBeDiscardedOnSubsume() {
         return props.discardOnSubsumed();
     }
@@ -320,13 +327,9 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
         return firstInterestingFields.equals(secondInterestingFields);
     }
 
-    /**
-     * Sets the callback for tracking when this checkpoint is discarded.
-     *
-     * @param discardCallback Callback to call when the checkpoint is discarded.
-     */
-    void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) {
-        this.discardCallback = discardCallback;
+    @Nullable
+    public CompletedCheckpointStats getStatistic() {
+        return completedCheckpointStats;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 38a5c9f..88339ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -183,28 +183,10 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
     }
 
     /**
-     * Returns the callback for the {@link CompletedCheckpoint}.
-     *
-     * @return Callback for the {@link CompletedCheckpoint}.
-     */
-    DiscardCallback getDiscardCallback() {
-        return new DiscardCallback();
-    }
-
-    /**
-     * Callback for the {@link CompletedCheckpoint} instance to notify about disposal of the
-     * checkpoint (most commonly when the checkpoint has been subsumed by a newer one).
+     * Mark the checkpoint has been discarded.
      */
-    class DiscardCallback {
-
-        /**
-         * Updates the discarded flag of the checkpoint stats.
-         *
-         * <p>After this notification, {@link #isDiscarded()} will return <code>true</code>.
-         */
-        void notifyDiscardedCheckpoint() {
-            discarded = true;
-        }
+    void discard() {
+        discarded = true;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index f9d3a0d..43a2a1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.PendingCheckpointStatsCallback;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
@@ -82,7 +81,6 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
                 totalSubtaskCount,
                 numAcknowledgedSubtasks,
                 taskStats,
-                FAILING_REPORT_CALLBACK,
                 checkpointedSize,
                 stateSize,
                 processedData,
@@ -122,19 +120,4 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
     public String getFailureMessage() {
         return failureMsg;
     }
-
-    private static final PendingCheckpointStatsCallback FAILING_REPORT_CALLBACK =
-            new PendingCheckpointStatsCallback() {
-                @Override
-                public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-                    throw new UnsupportedOperationException(
-                            "Failed checkpoint stats can't be completed");
-                }
-
-                @Override
-                public void reportFailedCheckpoint(FailedCheckpointStats failed) {
-                    throw new UnsupportedOperationException(
-                            "Failed checkpoint stats can't be failed");
-                }
-            };
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 8521722..aee95dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -111,6 +111,8 @@ public class PendingCheckpoint implements Checkpoint {
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
+    @Nullable private final PendingCheckpointStats pendingCheckpointStats;
+
     private int numAcknowledgedTasks;
 
     private boolean disposed;
@@ -132,8 +134,8 @@ public class PendingCheckpoint implements Checkpoint {
             Collection<String> masterStateIdentifiers,
             CheckpointProperties props,
             CheckpointStorageLocation targetLocation,
-            CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
-
+            CompletableFuture<CompletedCheckpoint> onCompletionPromise,
+            @Nullable PendingCheckpointStats pendingCheckpointStats) {
         checkArgument(
                 checkpointPlan.getTasksToWaitFor().size() > 0,
                 "Checkpoint needs at least one vertex that commits the checkpoint");
@@ -163,6 +165,7 @@ public class PendingCheckpoint implements Checkpoint {
                         : new HashSet<>(operatorCoordinatorsToConfirm);
         this.acknowledgedTasks = new HashSet<>(checkpointPlan.getTasksToWaitFor().size());
         this.onCompletionPromise = checkNotNull(onCompletionPromise);
+        this.pendingCheckpointStats = pendingCheckpointStats;
     }
 
     // --------------------------------------------------------------------------------------------
@@ -301,7 +304,7 @@ public class PendingCheckpoint implements Checkpoint {
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
             Executor executor,
-            @Nullable PendingCheckpointStats statsCallback)
+            CheckpointStatsTracker statsTracker)
             throws IOException {
 
         synchronized (lock) {
@@ -334,26 +337,24 @@ public class PendingCheckpoint implements Checkpoint {
                                 operatorStates,
                                 masterStates,
                                 props,
-                                finalizedLocation);
+                                finalizedLocation,
+                                toCompletedCheckpointStats(finalizedLocation));
 
-                onCompletionPromise.complete(completed);
-
-                if (statsCallback != null) {
+                CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
+                if (completedCheckpointStats != null) {
                     LOG.trace(
                             "Checkpoint {} size: {}Kb, duration: {}ms",
                             checkpointId,
-                            statsCallback.getStateSize() == 0
+                            completedCheckpointStats.getStateSize() == 0
                                     ? 0
-                                    : statsCallback.getStateSize() / 1024,
-                            statsCallback.getEndToEndDuration());
-                    // Finalize the statsCallback and give the completed checkpoint a
-                    // callback for discards.
-                    CompletedCheckpointStats.DiscardCallback discardCallback =
-                            statsCallback.reportCompletedCheckpoint(
-                                    finalizedLocation.getExternalPointer());
-                    completed.setDiscardCallback(discardCallback);
+                                    : completedCheckpointStats.getStateSize() / 1024,
+                            completedCheckpointStats.getEndToEndDuration());
+
+                    statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
                 }
 
+                onCompletionPromise.complete(completed);
+
                 // mark this pending checkpoint as disposed, but do NOT drop the state
                 dispose(false, checkpointsCleaner, postCleanup, executor);
 
@@ -366,6 +367,15 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
+    @Nullable
+    private CompletedCheckpointStats toCompletedCheckpointStats(
+            CompletedCheckpointStorageLocation finalizedLocation) {
+        return pendingCheckpointStats != null
+                ? pendingCheckpointStats.toCompletedCheckpointStats(
+                        finalizedLocation.getExternalPointer())
+                : null;
+    }
+
     /**
      * Acknowledges the task with the given execution attempt id and the given subtask state.
      *
@@ -377,8 +387,7 @@ public class PendingCheckpoint implements Checkpoint {
     public TaskAcknowledgeResult acknowledgeTask(
             ExecutionAttemptID executionAttemptId,
             TaskStateSnapshot operatorSubtaskStates,
-            CheckpointMetrics metrics,
-            @Nullable PendingCheckpointStats statsCallback) {
+            CheckpointMetrics metrics) {
 
         synchronized (lock) {
             if (disposed) {
@@ -415,7 +424,7 @@ public class PendingCheckpoint implements Checkpoint {
 
             // publish the checkpoint statistics
             // to prevent null-pointers from concurrent modification, copy reference onto stack
-            if (statsCallback != null) {
+            if (pendingCheckpointStats != null) {
                 // Do this in millis because the web frontend works with them
                 long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
                 long checkpointStartDelayMillis =
@@ -443,10 +452,12 @@ public class PendingCheckpoint implements Checkpoint {
                         subtaskStateStats.getStateSize() == 0
                                 ? 0
                                 : subtaskStateStats.getStateSize() / 1024,
-                        subtaskStateStats.getEndToEndDuration(statsCallback.getTriggerTimestamp()),
+                        subtaskStateStats.getEndToEndDuration(
+                                pendingCheckpointStats.getTriggerTimestamp()),
                         subtaskStateStats.getSyncCheckpointDuration(),
                         subtaskStateStats.getAsyncCheckpointDuration());
-                statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
+                pendingCheckpointStats.reportSubtaskStats(
+                        vertex.getJobvertexId(), subtaskStateStats);
             }
 
             return TaskAcknowledgeResult.SUCCESS;
@@ -541,11 +552,11 @@ public class PendingCheckpoint implements Checkpoint {
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
             Executor executor,
-            PendingCheckpointStats statsCallback) {
+            CheckpointStatsTracker statsTracker) {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(failureCause, statsCallback);
+            reportFailedCheckpoint(statsTracker, failureCause);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -628,11 +639,11 @@ public class PendingCheckpoint implements Checkpoint {
      *
      * @param cause The failure cause or <code>null</code>.
      */
-    private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
+    private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
         // to prevent null-pointers from concurrent modification, copy reference onto stack
-        if (statsCallback != null) {
-            long failureTimestamp = System.currentTimeMillis();
-            statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
+        if (pendingCheckpointStats != null) {
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index a575cf6..a8d74f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.stream.Collectors.toConcurrentMap;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Statistics for a pending checkpoint that is still in progress.
@@ -45,9 +44,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
 
     private static final long serialVersionUID = -973959257699390327L;
 
-    /** Tracker callback when the pending checkpoint is finalized or aborted. */
-    private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
-
     /** The current number of acknowledged subtasks. */
     private volatile int currentNumAcknowledgedSubtasks;
 
@@ -70,14 +66,12 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param triggerTimestamp Timestamp when the checkpoint was triggered.
      * @param props Checkpoint properties of the checkpoint.
      * @param taskStats Task stats for each involved operator.
-     * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
      */
     PendingCheckpointStats(
             long checkpointId,
             long triggerTimestamp,
             CheckpointProperties props,
-            Map<JobVertexID, Integer> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+            Map<JobVertexID, Integer> taskStats) {
         this(
                 checkpointId,
                 triggerTimestamp,
@@ -87,8 +81,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
                         .collect(
                                 toConcurrentMap(
                                         Map.Entry::getKey,
-                                        e -> new TaskStateStats(e.getKey(), e.getValue()))),
-                trackerCallback);
+                                        e -> new TaskStateStats(e.getKey(), e.getValue()))));
     }
 
     /**
@@ -99,15 +92,13 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param props Checkpoint properties of the checkpoint.
      * @param totalSubtaskCount Total number of subtasks for the checkpoint.
      * @param taskStats Task stats for each involved operator.
-     * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
      */
     PendingCheckpointStats(
             long checkpointId,
             long triggerTimestamp,
             CheckpointProperties props,
             int totalSubtaskCount,
-            Map<JobVertexID, TaskStateStats> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+            Map<JobVertexID, TaskStateStats> taskStats) {
         this(
                 checkpointId,
                 triggerTimestamp,
@@ -115,7 +106,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
                 totalSubtaskCount,
                 0,
                 taskStats,
-                trackerCallback,
                 0,
                 0,
                 0,
@@ -130,7 +120,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
             int totalSubtaskCount,
             int acknowledgedSubtaskCount,
             Map<JobVertexID, TaskStateStats> taskStats,
-            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback,
             long currentCheckpointedSize,
             long currentStateSize,
             long processedData,
@@ -138,7 +127,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
             @Nullable SubtaskStateStats latestAcknowledgedSubtask) {
 
         super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
-        this.trackerCallback = checkNotNull(trackerCallback);
         this.currentCheckpointedSize = currentCheckpointedSize;
         this.currentStateSize = currentStateSize;
         this.currentProcessedData = processedData;
@@ -220,31 +208,20 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
         }
     }
 
-    /**
-     * Reports a successfully completed pending checkpoint.
-     *
-     * @param externalPointer Optional external storage path if checkpoint was externalized.
-     * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal.
-     */
-    CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(String externalPointer) {
-        CompletedCheckpointStats completed =
-                new CompletedCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        numberOfSubtasks,
-                        new HashMap<>(taskStats),
-                        currentNumAcknowledgedSubtasks,
-                        currentCheckpointedSize,
-                        currentStateSize,
-                        currentProcessedData,
-                        currentPersistedData,
-                        latestAcknowledgedSubtask,
-                        externalPointer);
-
-        trackerCallback.reportCompletedCheckpoint(completed);
-
-        return completed.getDiscardCallback();
+    CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) {
+        return new CompletedCheckpointStats(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                numberOfSubtasks,
+                new HashMap<>(taskStats),
+                currentNumAcknowledgedSubtasks,
+                currentCheckpointedSize,
+                currentStateSize,
+                currentProcessedData,
+                currentPersistedData,
+                latestAcknowledgedSubtask,
+                externalPointer);
     }
 
     /**
@@ -253,24 +230,21 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
      * @param failureTimestamp Timestamp of the failure.
      * @param cause Optional cause of the failure.
      */
-    void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
-        FailedCheckpointStats failed =
-                new FailedCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        numberOfSubtasks,
-                        new HashMap<>(taskStats),
-                        currentNumAcknowledgedSubtasks,
-                        currentCheckpointedSize,
-                        currentStateSize,
-                        currentProcessedData,
-                        currentPersistedData,
-                        failureTimestamp,
-                        latestAcknowledgedSubtask,
-                        cause);
-
-        trackerCallback.reportFailedCheckpoint(failed);
+    FailedCheckpointStats toFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
+        return new FailedCheckpointStats(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                numberOfSubtasks,
+                new HashMap<>(taskStats),
+                currentNumAcknowledgedSubtasks,
+                currentCheckpointedSize,
+                currentStateSize,
+                currentProcessedData,
+                currentPersistedData,
+                failureTimestamp,
+                latestAcknowledgedSubtask,
+                cause);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 3dfebab..ce90895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -296,7 +296,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                         masterHookStates,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -356,7 +357,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                         masterHookStates,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 417473e..3b83973 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -773,7 +773,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         // set up the coordinator and validate the initial state
         SharedStateRegistry sharedStateRegistry =
@@ -1089,7 +1090,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 completedCheckpoint, new CheckpointsCleaner(), () -> {});
 
@@ -1131,7 +1133,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 completedCheckpoint, new CheckpointsCleaner(), () -> {});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9297c54..2cfa183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2821,7 +2821,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation()),
+                        new TestCompletedCheckpointStorageLocation(),
+                        null),
                 new CheckpointsCleaner(),
                 () -> {});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 44696cb..6239892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -239,7 +239,8 @@ public class CheckpointStateRestoreTest {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
@@ -269,7 +270,8 @@ public class CheckpointStateRestoreTest {
                         Collections.<MasterState>emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index c11e572..8e7b0bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -69,7 +69,7 @@ public class CheckpointStatsTrackerTest {
         pending.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         pending.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        pending.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
 
         CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
         // History should be empty
@@ -118,7 +118,7 @@ public class CheckpointStatsTrackerTest {
         completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        completed1.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null));
 
         // Failed checkpoint
         PendingCheckpointStats failed =
@@ -129,7 +129,7 @@ public class CheckpointStatsTrackerTest {
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         vertexToDop);
 
-        failed.reportFailedCheckpoint(12, null);
+        tracker.reportFailedCheckpoint(failed.toFailedCheckpoint(12, null));
 
         // Completed savepoint
         PendingCheckpointStats savepoint =
@@ -143,7 +143,7 @@ public class CheckpointStatsTrackerTest {
         savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
         savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
 
-        savepoint.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null));
 
         // In Progress
         PendingCheckpointStats inProgress =
@@ -242,7 +242,7 @@ public class CheckpointStatsTrackerTest {
         assertEquals(snapshot2, tracker.createSnapshot());
 
         // Complete checkpoint => new snapshot
-        pending.reportCompletedCheckpoint(null);
+        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
 
         CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot();
         assertNotEquals(snapshot2, snapshot3);
@@ -422,7 +422,7 @@ public class CheckpointStatsTrackerTest {
 
         assertTrue(pending.reportSubtaskStats(jobVertexID, subtaskStats));
 
-        pending.reportCompletedCheckpoint(externalPath);
+        stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
 
         // Verify completed checkpoint updated
         assertEquals(Long.valueOf(1), numCheckpoints.getValue());
@@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest {
                         singletonMap(jobVertexID, 1));
 
         long failureTimestamp = 1230123L;
-        nextPending.reportFailedCheckpoint(failureTimestamp, null);
+        stats.reportFailedCheckpoint(nextPending.toFailedCheckpoint(failureTimestamp, null));
 
         // Verify updated
         assertEquals(Long.valueOf(2), numCheckpoints.getValue());
@@ -482,7 +482,7 @@ public class CheckpointStatsTrackerTest {
                         singletonMap(jobVertexID, 1));
 
         thirdPending.reportSubtaskStats(jobVertexID, subtaskStats);
-        thirdPending.reportCompletedCheckpoint(null);
+        stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null));
 
         // Verify external path is "n/a", because internal checkpoint won't generate external path.
         assertEquals("n/a", latestCompletedExternalPath.getValue());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 122773a..62e5479 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -307,7 +307,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
                     operatorGroupState,
                     null,
                     props,
-                    new TestCompletedCheckpointStorageLocation());
+                    new TestCompletedCheckpointStorageLocation(),
+                    null);
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index c7cad82..3a29452 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -63,7 +63,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -75,7 +76,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -103,7 +105,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -115,7 +118,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -145,7 +149,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -157,7 +162,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -184,7 +190,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         CompletedCheckpoint checkpoint2 =
                 new CompletedCheckpoint(
@@ -196,7 +203,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
         checkpoints1.add(checkpoint1);
@@ -223,7 +231,8 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                        new TestCompletedCheckpointStorageLocation());
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -254,7 +263,8 @@ public class CompletedCheckpointTest {
                         operatorStates,
                         Collections.emptyList(),
                         props,
-                        location);
+                        location,
+                        null);
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -306,7 +316,8 @@ public class CompletedCheckpointTest {
                             new HashMap<>(operatorStates),
                             Collections.emptyList(),
                             retainProps,
-                            retainedLocation);
+                            retainedLocation,
+                            null);
 
             checkpoint.discardOnShutdown(status);
 
@@ -333,7 +344,8 @@ public class CompletedCheckpointTest {
                             new HashMap<>(operatorStates),
                             Collections.emptyList(),
                             discardProps,
-                            discardLocation);
+                            discardLocation,
+                            null);
 
             checkpoint.discardOnShutdown(status);
 
@@ -346,6 +358,23 @@ public class CompletedCheckpointTest {
     /** Tests that the stats callbacks happen if the callback is registered. */
     @Test
     public void testCompletedCheckpointStatsCallbacks() throws Exception {
+        Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+        JobVertexID jobVertexId = new JobVertexID();
+        taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+        CompletedCheckpointStats checkpointStats =
+                new CompletedCheckpointStats(
+                        1,
+                        0,
+                        CheckpointProperties.forCheckpoint(
+                                CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                        1,
+                        taskStats,
+                        1,
+                        1,
+                        1,
+                        1,
+                        mock(SubtaskStateStats.class),
+                        null);
         CompletedCheckpoint completed =
                 new CompletedCheckpoint(
                         new JobID(),
@@ -356,14 +385,11 @@ public class CompletedCheckpointTest {
                         Collections.emptyList(),
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new TestCompletedCheckpointStorageLocation());
-
-        CompletedCheckpointStats.DiscardCallback callback =
-                mock(CompletedCheckpointStats.DiscardCallback.class);
-        completed.setDiscardCallback(callback);
+                        new TestCompletedCheckpointStorageLocation(),
+                        checkpointStats);
 
         completed.discardOnShutdown(JobStatus.FINISHED);
-        verify(callback, times(1)).notifyDiscardedCheckpoint();
+        assertTrue(checkpointStats.isDiscarded());
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index ed36db9..064e26a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -418,7 +418,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
                 Collections.emptyMap(),
                 Collections.emptyList(),
                 props,
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
index d7abd6e..82db6ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
@@ -58,7 +58,8 @@ public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger {
                 new HashMap<>(),
                 Collections.emptyList(),
                 CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private static class FailingRetrievableStateHandle<T extends Serializable>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 37680ba..e46f608 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -52,17 +52,9 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
-
         PendingCheckpointStats pending =
                 new PendingCheckpointStats(
-                        checkpointId,
-                        triggerTimestamp,
-                        props,
-                        totalSubtaskCount,
-                        taskStats,
-                        callback);
+                        checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
 
         // Check initial state
         assertEquals(checkpointId, pending.getCheckpointId());
@@ -129,8 +121,7 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+        CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
 
         PendingCheckpointStats pending =
                 new PendingCheckpointStats(
@@ -139,8 +130,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
-                        taskStats,
-                        callback);
+                        taskStats);
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -154,8 +144,7 @@ public class PendingCheckpointStatsTest {
         // Report completed
         String externalPath = "asdjkasdjkasd";
 
-        CompletedCheckpointStats.DiscardCallback discardCallback =
-                pending.reportCompletedCheckpoint(externalPath);
+        callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
 
         ArgumentCaptor<CompletedCheckpointStats> args =
                 ArgumentCaptor.forClass(CompletedCheckpointStats.class);
@@ -166,7 +155,7 @@ public class PendingCheckpointStatsTest {
         assertNotNull(completed);
         assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
         assertFalse(completed.isDiscarded());
-        discardCallback.notifyDiscardedCheckpoint();
+        completed.discard();
         assertTrue(completed.isDiscarded());
         assertEquals(externalPath, completed.getExternalPath());
 
@@ -194,8 +183,7 @@ public class PendingCheckpointStatsTest {
         taskStats.put(task1.getJobVertexId(), task1);
         taskStats.put(task2.getJobVertexId(), task2);
 
-        CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
-                mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+        CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
 
         long triggerTimestamp = 123123;
         PendingCheckpointStats pending =
@@ -205,8 +193,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
-                        taskStats,
-                        callback);
+                        taskStats);
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -220,7 +207,7 @@ public class PendingCheckpointStatsTest {
         // Report failed
         Exception cause = new Exception("test exception");
         long failureTimestamp = 112211137;
-        pending.reportFailedCheckpoint(failureTimestamp, cause);
+        callback.reportFailedCheckpoint(pending.toFailedCheckpoint(failureTimestamp, cause));
 
         ArgumentCaptor<FailedCheckpointStats> args =
                 ArgumentCaptor.forClass(FailedCheckpointStats.class);
@@ -263,8 +250,7 @@ public class PendingCheckpointStatsTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         1337,
-                        taskStats,
-                        mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+                        taskStats);
 
         PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 05111d6..294c6fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
@@ -74,9 +73,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -218,7 +215,7 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
         pending.finalizeCheckpoint(
                 new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
@@ -292,75 +289,6 @@ public class PendingCheckpointTest {
         verify(state, times(1)).discardState();
     }
 
-    /** Tests that the stats callbacks happen if the callback is registered. */
-    @Test
-    public void testPendingCheckpointStatsCallbacks() throws Exception {
-        {
-            // Complete successfully
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
-            verify(callback, times(1))
-                    .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
-
-            pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
-            verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
-        }
-    }
-
     /**
      * FLINK-5985.
      *
@@ -373,7 +301,7 @@ public class PendingCheckpointTest {
                 createPendingCheckpoint(
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
         final OperatorState expectedState =
                 new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM);
         Assert.assertEquals(
@@ -394,7 +322,7 @@ public class PendingCheckpointTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
         pending.acknowledgeTask(
-                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
+                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
         Assert.assertFalse(pending.getOperatorStates().isEmpty());
     }
 
@@ -441,7 +369,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -494,7 +422,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -580,8 +508,7 @@ public class PendingCheckpointTest {
         checkpoint.acknowledgeTask(
                 ACK_TASKS.get(0).getAttemptId(),
                 TaskStateSnapshot.FINISHED_ON_RESTORE,
-                new CheckpointMetrics(),
-                null);
+                new CheckpointMetrics());
         assertThat(
                 recordCheckpointPlan.getReportedFinishedOnRestoreTasks(),
                 contains(ACK_TASKS.get(0).getVertex()));
@@ -595,8 +522,7 @@ public class PendingCheckpointTest {
         checkpoint.acknowledgeTask(
                 ACK_TASKS.get(0).getAttemptId(),
                 new TaskStateSnapshot(10, true),
-                new CheckpointMetrics(),
-                null);
+                new CheckpointMetrics());
         assertThat(
                 recordCheckpointPlan.getReportedOperatorsFinishedTasks(),
                 contains(ACK_TASKS.get(0).getVertex()));
@@ -648,7 +574,7 @@ public class PendingCheckpointTest {
                         Collections.emptyList(),
                         Executors.directExecutor());
 
-        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         return checkpoint;
     }
 
@@ -718,7 +644,8 @@ public class PendingCheckpointTest {
                 masterStateIdentifiers,
                 props,
                 location,
-                new CompletableFuture<>());
+                new CompletableFuture<>(),
+                null);
     }
 
     @SuppressWarnings("unchecked")
@@ -741,12 +668,7 @@ public class PendingCheckpointTest {
             CheckpointFailureReason reason,
             PendingCheckpointStats statsCallback) {
         checkpoint.abort(
-                reason,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor(),
-                statsCallback);
+                reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
     }
 
     private static final class QueueExecutor implements Executor {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index d36b321..9a584c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -107,7 +107,8 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
                             Collections.emptyMap(),
                             Collections.emptyList(),
                             CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                            new TestCompletedCheckpointStorageLocation()) {
+                            new TestCompletedCheckpointStorageLocation(),
+                            null) {
                         @Override
                         public boolean discardOnSubsume() {
                             discardAttempted.countDown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index ee4f46c..804d285 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -254,7 +254,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
                             Collections.emptyMap(),
                             Collections.emptyList(),
                             CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                            new TestCompletedCheckpointStorageLocation());
+                            new TestCompletedCheckpointStorageLocation(),
+                            null);
             // shouldn't fail despite the exception
             store.addCheckpointAndSubsumeOldestOne(
                     checkpointToAdd,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 50bac5c..50a9700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -880,7 +880,8 @@ public class JobMasterTest extends TestLogger {
                         null,
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                        new DummyCheckpointStorageLocation());
+                        new DummyCheckpointStorageLocation(),
+                        null);
 
         final StandaloneCompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 641a918..dfc3b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -150,7 +150,8 @@ public class SchedulerUtilsTest extends TestLogger {
                 singletonMap(operatorID, operatorState),
                 emptyList(),
                 CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                new TestCompletedCheckpointStorageLocation());
+                new TestCompletedCheckpointStorageLocation(),
+                null);
     }
 
     private IncrementalRemoteKeyedStateHandle buildIncrementalHandle(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
index f6db13c..cab4abe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -223,6 +223,7 @@ public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
                 new HashMap<>(),
                 null,
                 CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL),
-                new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"));
+                new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"),
+                null);
     }
 }

[flink] 04/04: [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fbfdb0e468356fe71826eb6b185ecda9bc8b1de3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:59:58 2022 +0100

    [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 15 ++++++++---
 .../CheckpointCoordinatorFailureTest.java          | 31 ++++++++++++++++++++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 ++++++-
 3 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5deee51..475effc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1339,9 +1339,7 @@ public class CheckpointCoordinator {
         try {
             final CompletedCheckpoint completedCheckpoint =
                     pendingCheckpoint.finalizeCheckpoint(
-                            checkpointsCleaner,
-                            this::scheduleTriggerRequest,
-                            executor);
+                            checkpointsCleaner, this::scheduleTriggerRequest, executor);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
@@ -1401,6 +1399,7 @@ public class CheckpointCoordinator {
                 checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
             }
 
+            reportFailedCheckpoint(checkpointId, exception);
             sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp());
             throw new CheckpointException(
                     "Could not complete the pending checkpoint " + checkpointId + '.',
@@ -1409,6 +1408,16 @@ public class CheckpointCoordinator {
         }
     }
 
+    private void reportFailedCheckpoint(long checkpointId, Exception exception) {
+        PendingCheckpointStats pendingCheckpointStats =
+                statsTracker.getPendingCheckpointStats(checkpointId);
+        if (pendingCheckpointStats != null) {
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(
+                            System.currentTimeMillis(), exception));
+        }
+    }
+
     void scheduleTriggerRequest() {
         synchronized (lock) {
             if (isShutdown()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index c02ee16..3674515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
@@ -49,6 +50,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -204,6 +206,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 new FailingCompletedCheckpointStore(failure);
 
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         final AtomicInteger cleanupCallCount = new AtomicInteger(0);
         final CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -226,14 +230,27 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                                 })
                         .setCompletedCheckpointStore(completedCheckpointStore)
                         .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
                         .build();
         checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-
+        CheckpointMetrics expectedReportedMetrics =
+                new CheckpointMetricsBuilder()
+                        .setTotalBytesPersisted(18)
+                        .setBytesPersistedOfThisCheckpoint(18)
+                        .setBytesProcessedDuringAlignment(19)
+                        .setAsyncDurationMillis(20)
+                        .setAlignmentDurationNanos(123 * 1_000_000)
+                        .setCheckpointStartDelayNanos(567 * 1_000_000)
+                        .build();
         try {
             checkpointCoordinator.receiveAcknowledgeMessage(
                     new AcknowledgeCheckpoint(
-                            graph.getJobID(), attemptId, checkpointIDCounter.getLast()),
+                            graph.getJobID(),
+                            attemptId,
+                            checkpointIDCounter.getLast(),
+                            expectedReportedMetrics,
+                            new TaskStateSnapshot()),
                     "unknown location");
             fail("CheckpointException should have been thrown.");
         } catch (CheckpointException e) {
@@ -242,6 +259,16 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                     is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
         }
 
+        AbstractCheckpointStats actualStats =
+                statsTracker
+                        .createSnapshot()
+                        .getHistory()
+                        .getCheckpointById(checkpointIDCounter.getLast());
+
+        assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
+        assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+        assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
+
         assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index df8d606..3c09945 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,8 +299,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
             AbstractCheckpointStats actual) {
         assertEquals(checkpointId, actual.getCheckpointId());
         assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
-        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
         assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+        assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
+    }
+
+    public static void assertStatsMetrics(
+            JobVertexID jobVertexID,
+            int subtasIdx,
+            CheckpointMetrics expected,
+            AbstractCheckpointStats actual) {
+        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
         SubtaskStateStats taskStats =
                 actual.getAllTaskStateStats().stream()
                         .filter(s -> s.getJobVertexId().equals(jobVertexID))

[flink] 03/04: [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 906f3244a3febf3bfe5221290d4cd3ad4746765f
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:55:31 2022 +0100

    [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 31 ++++++++++++++++++++--
 .../runtime/checkpoint/PendingCheckpoint.java      | 18 +------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 21 +++++++++++++--
 .../runtime/checkpoint/PendingCheckpointTest.java  |  5 ++--
 4 files changed, 51 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5248538..5deee51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1256,11 +1256,39 @@ public class CheckpointCoordinator {
             } else {
                 lastSubsumed = null;
             }
+
+            reportCompletedCheckpoint(completedCheckpoint);
         } finally {
             pendingCheckpoints.remove(checkpointId);
             scheduleTriggerRequest();
         }
 
+        cleanupAfterCompletedCheckpoint(
+                pendingCheckpoint, checkpointId, completedCheckpoint, lastSubsumed, props);
+    }
+
+    private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        CompletedCheckpointStats completedCheckpointStats = completedCheckpoint.getStatistic();
+        if (completedCheckpointStats != null) {
+            LOG.trace(
+                    "Checkpoint {} size: {}Kb, duration: {}ms",
+                    completedCheckpoint.getCheckpointID(),
+                    completedCheckpointStats.getStateSize() == 0
+                            ? 0
+                            : completedCheckpointStats.getStateSize() / 1024,
+                    completedCheckpointStats.getEndToEndDuration());
+            // Finalize the statsCallback and give the completed checkpoint a
+            // callback for discards.
+            statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
+        }
+    }
+
+    private void cleanupAfterCompletedCheckpoint(
+            PendingCheckpoint pendingCheckpoint,
+            long checkpointId,
+            CompletedCheckpoint completedCheckpoint,
+            CompletedCheckpoint lastSubsumed,
+            CheckpointProperties props) {
         // remember recent checkpoint id for debugging purposes
         rememberRecentCheckpointId(checkpointId);
 
@@ -1313,8 +1341,7 @@ public class CheckpointCoordinator {
                     pendingCheckpoint.finalizeCheckpoint(
                             checkpointsCleaner,
                             this::scheduleTriggerRequest,
-                            executor,
-                            statsTracker);
+                            executor);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 76c6a44..08cd23f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -303,10 +303,7 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     public CompletedCheckpoint finalizeCheckpoint(
-            CheckpointsCleaner checkpointsCleaner,
-            Runnable postCleanup,
-            Executor executor,
-            CheckpointStatsTracker statsTracker)
+            CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
             throws IOException {
 
         synchronized (lock) {
@@ -342,19 +339,6 @@ public class PendingCheckpoint implements Checkpoint {
                                 finalizedLocation,
                                 toCompletedCheckpointStats(finalizedLocation));
 
-                CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
-                if (completedCheckpointStats != null) {
-                    LOG.trace(
-                            "Checkpoint {} size: {}Kb, duration: {}ms",
-                            checkpointId,
-                            completedCheckpointStats.getStateSize() == 0
-                                    ? 0
-                                    : completedCheckpointStats.getStateSize() / 1024,
-                            completedCheckpointStats.getEndToEndDuration());
-
-                    statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
-                }
-
                 onCompletionPromise.complete(completed);
 
                 // mark this pending checkpoint as disposed, but do NOT drop the state
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2cfa183..df8d606 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1988,9 +1988,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
-
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         // set up the coordinator and validate the initial state
-        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointCoordinatorConfiguration(
+                                CheckpointCoordinatorConfiguration.builder()
+                                        .setAlignedCheckpointTimeout(Long.MAX_VALUE)
+                                        .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                                        .build())
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
+                        .build();
 
         assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
         assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -2083,6 +2094,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
         assertEquals(pending.getCheckpointId(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
+        AbstractCheckpointStats actualStats =
+                statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
+
+        assertEquals(checkpointId, actualStats.getCheckpointId());
+        assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+
         checkpointCoordinator.shutdown();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 294c6fa..799d743 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -217,8 +217,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
         assertTrue(pending.areTasksFullyAcknowledged());
-        pending.finalizeCheckpoint(
-                new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
         assertTrue(future.isDone());
 
         // Finalize (missing ACKs)
@@ -228,7 +227,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         try {
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected