You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/08 09:23:11 UTC

[flink] 02/02: [FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35757ed3c11f32769d6d99882596f78f8b8e12b5
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sat Feb 6 14:35:58 2021 +0100

    [FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 37 +++++++++++------
 .../runtime/checkpoint/CheckpointStatsTracker.java | 10 +++++
 .../runtime/checkpoint/PendingCheckpoint.java      | 30 +++++---------
 .../runtime/checkpoint/PendingCheckpointTest.java  | 48 +++++++++++++---------
 4 files changed, 72 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d4fa785..23ce6c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -717,7 +717,7 @@ public class CheckpointCoordinator {
                         checkpointStorageLocation,
                         onCompletionPromise);
 
-        reportToStatsTracker(checkpoint, checkpointPlan.getTasksToWaitFor());
+        trackPendingCheckpointStats(checkpoint);
 
         synchronized (lock) {
             pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1052,7 +1052,8 @@ public class CheckpointCoordinator {
                 switch (checkpoint.acknowledgeTask(
                         message.getTaskExecutionId(),
                         message.getSubtaskState(),
-                        message.getCheckpointMetrics())) {
+                        message.getCheckpointMetrics(),
+                        getStatsCallback(checkpoint))) {
                     case SUCCESS:
                         LOG.debug(
                                 "Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1173,7 +1174,11 @@ public class CheckpointCoordinator {
             try {
                 completedCheckpoint =
                         pendingCheckpoint.finalizeCheckpoint(
-                                checkpointsCleaner, this::scheduleTriggerRequest, executor);
+                                checkpointsCleaner,
+                                this::scheduleTriggerRequest,
+                                executor,
+                                getStatsCallback(pendingCheckpoint));
+
                 failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
             } catch (Exception e1) {
                 // abort the current pending checkpoint if we fails to finalize the pending
@@ -1908,7 +1913,8 @@ public class CheckpointCoordinator {
                         exception.getCause(),
                         checkpointsCleaner,
                         this::scheduleTriggerRequest,
-                        executor);
+                        executor,
+                        getStatsCallback(pendingCheckpoint));
 
                 if (pendingCheckpoint.getProps().isSavepoint()
                         && pendingCheckpoint.getProps().isSynchronous()) {
@@ -2049,24 +2055,29 @@ public class CheckpointCoordinator {
         SKIP;
     }
 
-    private void reportToStatsTracker(
-            PendingCheckpoint checkpoint, Map<ExecutionAttemptID, ExecutionVertex> tasks) {
+    private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
         if (statsTracker == null) {
             return;
         }
         Map<JobVertexID, Integer> vertices =
-                tasks.values().stream()
+                checkpoint.getCheckpointPlan().getTasksToWaitFor().values().stream()
                         .map(ExecutionVertex::getJobVertex)
                         .distinct()
                         .collect(
                                 toMap(
                                         ExecutionJobVertex::getJobVertexId,
                                         ExecutionJobVertex::getParallelism));
-        checkpoint.setStatsCallback(
-                statsTracker.reportPendingCheckpoint(
-                        checkpoint.getCheckpointID(),
-                        checkpoint.getCheckpointTimestamp(),
-                        checkpoint.getProps(),
-                        vertices));
+        statsTracker.reportPendingCheckpoint(
+                checkpoint.getCheckpointID(),
+                checkpoint.getCheckpointTimestamp(),
+                checkpoint.getProps(),
+                vertices);
+    }
+
+    @Nullable
+    private PendingCheckpointStats getStatsCallback(PendingCheckpoint pendingCheckpoint) {
+        return statsTracker == null
+                ? null
+                : statsTracker.getPendingCheckpointStats(pendingCheckpoint.getCheckpointID());
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index e8883dd..8c4ac97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -255,6 +255,16 @@ public class CheckpointStatsTracker {
         }
     }
 
+    public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
+        statsReadWriteLock.lock();
+        try {
+            AbstractCheckpointStats stats = history.getCheckpointById(checkpointId);
+            return stats instanceof PendingCheckpointStats ? (PendingCheckpointStats) stats : null;
+        } finally {
+            statsReadWriteLock.unlock();
+        }
+    }
+
     public void reportIncompleteStats(
             long checkpointId, ExecutionVertex vertex, CheckpointMetrics metrics) {
         statsReadWriteLock.lock();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6123e19..e44d2fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -116,9 +116,6 @@ public class PendingCheckpoint implements Checkpoint {
 
     private boolean discarded;
 
-    /** Optional stats tracker callback. */
-    @Nullable private PendingCheckpointStats statsCallback;
-
     private volatile ScheduledFuture<?> cancellerHandle;
 
     private CheckpointException failureCause;
@@ -257,15 +254,6 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     /**
-     * Sets the callback for tracking this pending checkpoint.
-     *
-     * @param trackerCallback Callback for collecting subtask stats.
-     */
-    void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
-        this.statsCallback = trackerCallback;
-    }
-
-    /**
      * Sets the handle for the canceller to this pending checkpoint. This method fails with an
      * exception if a handle has already been set.
      *
@@ -304,7 +292,10 @@ public class PendingCheckpoint implements Checkpoint {
     }
 
     public CompletedCheckpoint finalizeCheckpoint(
-            CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup,
+            Executor executor,
+            @Nullable PendingCheckpointStats statsCallback)
             throws IOException {
 
         synchronized (lock) {
@@ -340,7 +331,6 @@ public class PendingCheckpoint implements Checkpoint {
                 onCompletionPromise.complete(completed);
 
                 // to prevent null-pointers from concurrent modification, copy reference onto stack
-                PendingCheckpointStats statsCallback = this.statsCallback;
                 if (statsCallback != null) {
                     // Finalize the statsCallback and give the completed checkpoint a
                     // callback for discards.
@@ -373,7 +363,8 @@ public class PendingCheckpoint implements Checkpoint {
     public TaskAcknowledgeResult acknowledgeTask(
             ExecutionAttemptID executionAttemptId,
             TaskStateSnapshot operatorSubtaskStates,
-            CheckpointMetrics metrics) {
+            CheckpointMetrics metrics,
+            @Nullable PendingCheckpointStats statsCallback) {
 
         synchronized (lock) {
             if (disposed) {
@@ -428,7 +419,6 @@ public class PendingCheckpoint implements Checkpoint {
 
             // publish the checkpoint statistics
             // to prevent null-pointers from concurrent modification, copy reference onto stack
-            final PendingCheckpointStats statsCallback = this.statsCallback;
             if (statsCallback != null) {
                 // Do this in millis because the web frontend works with them
                 long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
@@ -518,11 +508,12 @@ public class PendingCheckpoint implements Checkpoint {
             @Nullable Throwable cause,
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
-            Executor executor) {
+            Executor executor,
+            PendingCheckpointStats statsCallback) {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(failureCause);
+            reportFailedCheckpoint(failureCause, statsCallback);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -605,9 +596,8 @@ public class PendingCheckpoint implements Checkpoint {
      *
      * @param cause The failure cause or <code>null</code>.
      */
-    private void reportFailedCheckpoint(Exception cause) {
+    private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
         // to prevent null-pointers from concurrent modification, copy reference onto stack
-        final PendingCheckpointStats statsCallback = this.statsCallback;
         if (statsCallback != null) {
             long failureTimestamp = System.currentTimeMillis();
             statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 0ebad88..52be1bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -182,9 +182,10 @@ public class PendingCheckpointTest {
         future = pending.getCompletionFuture();
 
         assertFalse(future.isDone());
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
-        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+        pending.finalizeCheckpoint(
+                new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
         assertTrue(future.isDone());
 
         // Finalize (missing ACKs)
@@ -194,7 +195,7 @@ public class PendingCheckpointTest {
         assertFalse(future.isDone());
         try {
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
             fail("Did not throw expected Exception");
         } catch (IllegalStateException ignored) {
             // Expected
@@ -265,14 +266,13 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
             verify(callback, times(1))
                     .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
 
             pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                    new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
             verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
         }
 
@@ -283,9 +283,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -296,9 +295,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -309,9 +307,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
 
@@ -322,9 +319,8 @@ public class PendingCheckpointTest {
                     createPendingCheckpoint(
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-            pending.setStatsCallback(callback);
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
+            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
             verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
         }
     }
@@ -341,7 +337,7 @@ public class PendingCheckpointTest {
                 createPendingCheckpoint(
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
+        pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
         Assert.assertTrue(pending.getOperatorStates().isEmpty());
     }
 
@@ -359,7 +355,7 @@ public class PendingCheckpointTest {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
         pending.acknowledgeTask(
-                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
+                ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
         Assert.assertFalse(pending.getOperatorStates().isEmpty());
     }
 
@@ -406,7 +402,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -459,7 +455,7 @@ public class PendingCheckpointTest {
         assertTrue(pending.areMasterStatesFullyAcknowledged());
         assertFalse(pending.areTasksFullyAcknowledged());
 
-        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         assertTrue(pending.areTasksFullyAcknowledged());
 
         final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -570,7 +566,7 @@ public class PendingCheckpointTest {
                         Collections.emptyList(),
                         Executors.directExecutor());
 
-        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
         return checkpoint;
     }
 
@@ -634,8 +630,20 @@ public class PendingCheckpointTest {
     }
 
     private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
+        abort(checkpoint, reason, null);
+    }
+
+    private void abort(
+            PendingCheckpoint checkpoint,
+            CheckpointFailureReason reason,
+            PendingCheckpointStats statsCallback) {
         checkpoint.abort(
-                reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+                reason,
+                null,
+                new CheckpointsCleaner(),
+                () -> {},
+                Executors.directExecutor(),
+                statsCallback);
     }
 
     private static final class QueueExecutor implements Executor {