You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/08 09:23:09 UTC

[flink] branch master updated (4a5ef8a -> 35757ed)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4a5ef8a  [hotfix][docs] Removes link formatting of anchors
     new 3edac35  [hotfix][tests] Extract PendingCheckpointTest.abort method
     new 35757ed  [FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  |  37 +++--
 .../runtime/checkpoint/CheckpointStatsTracker.java |  10 ++
 .../runtime/checkpoint/PendingCheckpoint.java      |  30 ++--
 .../runtime/checkpoint/PendingCheckpointTest.java  | 163 ++++++---------------
 4 files changed, 92 insertions(+), 148 deletions(-)


[flink] 01/02: [hotfix][tests] Extract PendingCheckpointTest.abort method

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3edac35872c6682e95dfd66afb5dae4b039b61c4
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sat Feb 6 14:31:08 2021 +0100

    [hotfix][tests] Extract PendingCheckpointTest.abort method
---
 .../runtime/checkpoint/PendingCheckpointTest.java  | 117 ++++-----------------
 1 file changed, 21 insertions(+), 96 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6b72ebf..0ebad88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -114,12 +114,7 @@ public class PendingCheckpointTest {
         assertFalse(pending.canBeSubsumed());
 
         try {
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_SUBSUMED,
-                    null,
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected
@@ -141,12 +136,7 @@ public class PendingCheckpointTest {
         assertFalse(pending.canBeSubsumed());
 
         try {
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_SUBSUMED,
-                    null,
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected
@@ -168,12 +158,7 @@ public class PendingCheckpointTest {
         CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         assertTrue(future.isDone());
 
         // Abort expired
@@ -181,12 +166,7 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         assertTrue(future.isDone());
 
         // Abort subsumed
@@ -194,12 +174,7 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         assertTrue(future.isDone());
 
         // Finalize (all ACK'd)
@@ -241,12 +216,7 @@ public class PendingCheckpointTest {
         PendingCheckpoint pending = createPendingCheckpoint(props, executor);
         setTaskState(pending, state);
 
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                executor);
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         // execute asynchronous discard operation
         executor.runQueuedCommands();
         verify(state, times(1)).discardState();
@@ -257,12 +227,7 @@ public class PendingCheckpointTest {
         pending = createPendingCheckpoint(props, executor);
         setTaskState(pending, state);
 
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                new Exception("Expected Test Exception"),
-                new CheckpointsCleaner(),
-                () -> {},
-                executor);
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         // execute asynchronous discard operation
         executor.runQueuedCommands();
         verify(state, times(1)).discardState();
@@ -273,12 +238,7 @@ public class PendingCheckpointTest {
         pending = createPendingCheckpoint(props, executor);
         setTaskState(pending, state);
 
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_EXPIRED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                executor);
+        abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
         // execute asynchronous discard operation
         executor.runQueuedCommands();
         verify(state, times(1)).discardState();
@@ -289,12 +249,7 @@ public class PendingCheckpointTest {
         pending = createPendingCheckpoint(props, executor);
         setTaskState(pending, state);
 
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_SUBSUMED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                executor);
+        abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
         // execute asynchronous discard operation
         executor.runQueuedCommands();
         verify(state, times(1)).discardState();
@@ -330,12 +285,7 @@ public class PendingCheckpointTest {
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
             pending.setStatsCallback(callback);
 
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_SUBSUMED,
-                    null,
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -348,12 +298,7 @@ public class PendingCheckpointTest {
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
             pending.setStatsCallback(callback);
 
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_DECLINED,
-                    null,
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -366,12 +311,7 @@ public class PendingCheckpointTest {
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
             pending.setStatsCallback(callback);
 
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_SUBSUMED,
-                    new Exception("Expected test error"),
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -384,12 +324,7 @@ public class PendingCheckpointTest {
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
             pending.setStatsCallback(callback);
 
-            pending.abort(
-                    CheckpointFailureReason.CHECKPOINT_EXPIRED,
-                    null,
-                    new CheckpointsCleaner(),
-                    () -> {},
-                    Executors.directExecutor());
+            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
     }
@@ -435,12 +370,7 @@ public class PendingCheckpointTest {
                         false, CheckpointType.CHECKPOINT, true, true, true, true, true);
 
         PendingCheckpoint aborted = createPendingCheckpoint(props);
-        aborted.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(aborted, CheckpointFailureReason.CHECKPOINT_DECLINED);
         assertTrue(aborted.isDisposed());
         assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
 
@@ -448,12 +378,7 @@ public class PendingCheckpointTest {
         ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
 
         assertTrue(pending.setCancellerHandle(canceller));
-        pending.abort(
-                CheckpointFailureReason.CHECKPOINT_DECLINED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
         verify(canceller).cancel(false);
     }
 
@@ -604,12 +529,7 @@ public class PendingCheckpointTest {
         final PendingCheckpoint checkpoint =
                 createPendingCheckpointWithAcknowledgedCoordinators(handle1, handle2);
 
-        checkpoint.abort(
-                CheckpointFailureReason.CHECKPOINT_EXPIRED,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor());
+        abort(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
 
         assertTrue(handle1.isDisposed());
         assertTrue(handle2.isDisposed());
@@ -713,6 +633,11 @@ public class PendingCheckpointTest {
         taskStates.put(new OperatorID(), state);
     }
 
+    private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
+        checkpoint.abort(
+                reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+    }
+
     private static final class QueueExecutor implements Executor {
 
         private final Queue<Runnable> queue = new ArrayDeque<>(4);


[flink] 02/02: [FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35757ed3c11f32769d6d99882596f78f8b8e12b5
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sat Feb 6 14:35:58 2021 +0100

    [FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 37 +++++++++++------
 .../runtime/checkpoint/CheckpointStatsTracker.java | 10 +++++
 .../runtime/checkpoint/PendingCheckpoint.java      | 30 +++++---------
 .../runtime/checkpoint/PendingCheckpointTest.java  | 48 +++++++++++++---------
 4 files changed, 72 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d4fa785..23ce6c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -717,7 +717,7 @@ public class CheckpointCoordinator {
                         checkpointStorageLocation,
                         onCompletionPromise);
 
-        reportToStatsTracker(checkpoint, checkpointPlan.getTasksToWaitFor());
+        trackPendingCheckpointStats(checkpoint);
 
         synchronized (lock) {
             pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1052,7 +1052,8 @@ public class CheckpointCoordinator {
                 switch (checkpoint.acknowledgeTask(
                         message.getTaskExecutionId(),
                         message.getSubtaskState(),
-                        message.getCheckpointMetrics())) {
+                        message.getCheckpointMetrics(),
+                        getStatsCallback(checkpoint))) {
                     case SUCCESS:
                         LOG.debug(
                                 "Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1173,7 +1174,11 @@ public class CheckpointCoordinator {
             try {
                 completedCheckpoint =
                         pendingCheckpoint.finalizeCheckpoint(
-                                checkpointsCleaner, this::scheduleTriggerRequest, executor);
+                                checkpointsCleaner,
+                                this::scheduleTriggerRequest,
+                                executor,
+                                getStatsCallback(pendingCheckpoint));
+
                 failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
             } catch (Exception e1) {
                 // abort the current pending checkpoint if we fails to finalize the pending
@@ -1908,7 +1913,8 @@ public class CheckpointCoordinator {
                         exception.getCause(),
                         checkpointsCleaner,
                         this::scheduleTriggerRequest,
-                        executor);
+                        executor,
+                        getStatsCallback(pendingCheckpoint));
 
                 if (pendingCheckpoint.getProps().isSavepoint()
                         && pendingCheckpoint.getProps().isSynchronous()) {
@@ -2049,24 +2055,29 @@ public class CheckpointCoordinator {
         SKIP;
     }
 
-    private void reportToStatsTracker(
-            PendingCheckpoint checkpoint, Map<ExecutionAttemptID, ExecutionVertex> tasks) {
+    private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
         if (statsTracker == null) {
             return;
         }
         Map<JobVertexID, Integer> vertices =
-                tasks.values().stream()
+                checkpoint.getCheckpointPlan().getTasksToWaitFor().values().stream()
                         .map(ExecutionVertex::getJobVertex)
                         .distinct()
                         .collect(
                                 toMap(
                                         ExecutionJobVertex::getJobVertexId,
                                         ExecutionJobVertex::getParallelism));
-        checkpoint.setStatsCallback(
-                statsTracker.reportPendingCheckpoint(
-                        checkpoint.getCheckpointID(),
-                        checkpoint.getCheckpointTimestamp(),
-                        checkpoint.getProps(),
-                        vertices));
+        statsTracker.reportPendingCheckpoint(
+                checkpoint.getCheckpointID(),
+                checkpoint.getCheckpointTimestamp(),
+                checkpoint.getProps(),
+                vertices);
+    }
+
+    @Nullable
+    private PendingCheckpointStats getStatsCallback(PendingCheckpoint pendingCheckpoint) {
+        return statsTracker == null
+                ? null
+                : statsTracker.getPendingCheckpointStats(pendingCheckpoint.getCheckpointID());
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index e8883dd..8c4ac97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -255,6 +255,16 @@ public class CheckpointStatsTracker {
         }
     }
 
+    public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
+        statsReadWriteLock.lock();
+        try {
+            AbstractCheckpointStats stats = history.getCheckpointById(checkpointId);
+            return stats instanceof PendingCheckpointStats ? (PendingCheckpointStats) stats : null;
+        } finally {
+            statsReadWriteLock.unlock();
+        }
+    }
+
     public void reportIncompleteStats(
             long checkpointId, ExecutionVertex vertex, CheckpointMetrics metrics) {
         statsReadWriteLock.lock();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6123e19..e44d2fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -116,9 +116,6 @@ public class PendingCheckpoint implements Checkpoint {
 
     private boolean discarded;
 
-    /** Optional stats tracker callback. */
-    @Nullable private PendingCheckpointStats statsCallback;
-
     private volatile ScheduledFuture<?> cancellerHandle;
 
     private CheckpointException failureCause;
@@ -257,15 +254,6 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     /**
-     * Sets the callback for tracking this pending checkpoint.
-     *
-     * @param trackerCallback Callback for collecting subtask stats.
-     */
-    void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
-        this.statsCallback = trackerCallback;
-    }
-
-    /**
      * Sets the handle for the canceller to this pending checkpoint. This method fails with an
      * exception if a handle has already been set.
      *
@@ -304,7 +292,10 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     public CompletedCheckpoint finalizeCheckpoint(
-            CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup,
+            Executor executor,
+            @Nullable PendingCheckpointStats statsCallback)
             throws IOException {
 
         synchronized (lock) {
@@ -340,7 +331,6 @@ public class PendingCheckpoint implements Checkpoint {
                 onCompletionPromise.complete(completed);
 
                 // to prevent null-pointers from concurrent modification, copy reference onto stack
-                PendingCheckpointStats statsCallback = this.statsCallback;
                 if (statsCallback != null) {
                     // Finalize the statsCallback and give the completed checkpoint a
                     // callback for discards.
@@ -373,7 +363,8 @@ public class PendingCheckpoint implements Checkpoint {
     public TaskAcknowledgeResult acknowledgeTask(
             ExecutionAttemptID executionAttemptId,
             TaskStateSnapshot operatorSubtaskStates,
-            CheckpointMetrics metrics) {
+            CheckpointMetrics metrics,
+            @Nullable PendingCheckpointStats statsCallback) {
 
         synchronized (lock) {
             if (disposed) {
@@ -428,7 +419,6 @@ public class PendingCheckpoint implements Checkpoint {
 
             // publish the checkpoint statistics
             // to prevent null-pointers from concurrent modification, copy reference onto stack
-            final PendingCheckpointStats statsCallback = this.statsCallback;
             if (statsCallback != null) {
                 // Do this in millis because the web frontend works with them
                 long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
@@ -518,11 +508,12 @@ public class PendingCheckpoint implements Checkpoint {
             @Nullable Throwable cause,
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
-            Executor executor) {
+            Executor executor,
+            PendingCheckpointStats statsCallback) {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(failureCause);
+            reportFailedCheckpoint(failureCause, statsCallback);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -605,9 +596,8 @@ public class PendingCheckpoint implements Checkpoint {
      *
      * @param cause The failure cause or <code>null</code>.
      */
-    private void reportFailedCheckpoint(Exception cause) {
+    private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
         // to prevent null-pointers from concurrent modification, copy reference onto stack
-        final PendingCheckpointStats statsCallback = this.statsCallback;
         if (statsCallback != null) {
             long failureTimestamp = System.currentTimeMillis();
             statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 0ebad88..52be1bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -182,9 +182,10 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
-        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+        pending.finalizeCheckpoint(
+                new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
         assertTrue(future.isDone());
 
         // Finalize (missing ACKs)
@@ -194,7 +195,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         try {
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected
@@ -265,14 +266,13 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
             verify(callback, times(1))
                     .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
 
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
             verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
         }
 
@@ -283,9 +283,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -296,9 +295,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -309,9 +307,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -322,9 +319,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
     }
@@ -341,7 +337,7 @@ public class PendingCheckpointTest {
                 createPendingCheckpoint(
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
+        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
         Assert.assertTrue(pending.getOperatorStates().isEmpty());
     }
 
@@ -359,7 +355,7 @@ public class PendingCheckpointTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
         pending.acknowledgeTask(
-                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
+                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
         Assert.assertFalse(pending.getOperatorStates().isEmpty());
     }
 
@@ -406,7 +402,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -459,7 +455,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -570,7 +566,7 @@ public class PendingCheckpointTest {
                         Collections.emptyList(),
                         Executors.directExecutor());
 
-        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         return checkpoint;
     }
 
@@ -634,8 +630,20 @@ public class PendingCheckpointTest {
     }
 
     private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
+        abort(checkpoint, reason, null);
+    }
+
+    private void abort(
+            PendingCheckpoint checkpoint,
+            CheckpointFailureReason reason,
+            PendingCheckpointStats statsCallback) {
         checkpoint.abort(
-                reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                reason,
+                null,
+                new CheckpointsCleaner(),
+                () -> {},
+                Executors.directExecutor(),
+                statsCallback);
     }
 
     private static final class QueueExecutor implements Executor {