You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/14 14:25:19 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #14635: [FLINK-19462][checkpointing] Update failed checkpoint stats

pnowojski commented on a change in pull request #14635:
URL: https://github.com/apache/flink/pull/14635#discussion_r557327924



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -312,15 +312,17 @@ public void notifyCheckpointComplete(
             long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning)
             throws Exception {
         if (isRunning.get()) {
-            LOG.debug("Notification of complete checkpoint for task {}", taskName);
+            LOG.debug(
+                    "Notification of completed checkpoint {} for task {}", taskName, checkpointId);

Review comment:
       switch `taskName` and `checkpointId`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -333,7 +335,7 @@ public void notifyCheckpointAborted(
 
         Exception previousException = null;
         if (isRunning.get()) {
-            LOG.debug("Notification of aborted checkpoint for task {}", taskName);
+            LOG.debug("Notification of aborted checkpoint {} for task {}", taskName, checkpointId);

Review comment:
       switch `taskName` and `checkpointId`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
##########
@@ -155,20 +155,22 @@ public void setResultSubpartitionStateFuture(
         this.resultSubpartitionStateFuture = resultSubpartitionStateFuture;
     }
 
-    public void cancel() throws Exception {
+    /** @return discarded state size (if available). */
+    public long cancel() throws Exception {

Review comment:
       Can this be unit tested? Maybe in `OperatorSnapshotFuturesTest`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
##########
@@ -75,7 +75,6 @@
 
         checkArgument(subtaskIndex >= 0, "Negative subtask index");
         this.subtaskIndex = subtaskIndex;
-        checkArgument(stateSize >= 0, "Negative state size");

Review comment:
       Why was this removed?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
##########
@@ -70,9 +70,11 @@ public static void bestEffortDiscardAllStateObjects(
      *
      * @param stateFuture to be discarded
      * @throws Exception if the discard operation failed
+     * @return the size of state before cancellation (if available)
      */
-    public static void discardStateFuture(Future<? extends StateObject> stateFuture)
+    public static long discardStateFuture(Future<? extends StateObject> stateFuture)
             throws Exception {
+        long stateSize = 0;

Review comment:
       Hmmm, -1? Don't know what would be better. `-1` we could print as `N/A` in the WebUI?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
##########
@@ -46,8 +48,11 @@
     /** Is the checkpoint completed as an unaligned checkpoint. */
     private final boolean unalignedCheckpoint;
 
+    private final long totalBytesPersisted;
+
+    @VisibleForTesting
     public CheckpointMetrics() {
-        this(-1L, -1L, -1L, -1L, -1L, -1L, false);
+        this(-1L, -1L, -1L, -1L, -1L, -1L, false, 0L);

Review comment:
       nit: `-1L` for the sake of consistency?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
##########
@@ -87,15 +72,20 @@
             @Nullable SubtaskStateStats latestAcknowledgedSubtask,
             @Nullable Throwable cause) {
 
-        super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+        super(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                totalSubtaskCount,
+                numAcknowledgedSubtasks,
+                taskStats,
+                PendingCheckpointStatsCallback.noOp(),

Review comment:
       I don't understand this. How is this supposed to be working? How stats are being reported for failed checkpoints? How are we reporting I get, that in this commit you would like to have temporary `noOp` being used, but I don't see this being replaced later? Is something missing, or is this a dead code, or am I missing something?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
##########
@@ -70,9 +70,11 @@ public static void bestEffortDiscardAllStateObjects(
      *
      * @param stateFuture to be discarded
      * @throws Exception if the discard operation failed
+     * @return the size of state before cancellation (if available)
      */
-    public static void discardStateFuture(Future<? extends StateObject> stateFuture)
+    public static long discardStateFuture(Future<? extends StateObject> stateFuture)

Review comment:
       Can this be unit tested? Maybe in `OperatorSnapshotFuturesTest`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
##########
@@ -60,6 +61,39 @@
     /** Stats of the latest acknowledged subtask. */
     private volatile SubtaskStateStats latestAcknowledgedSubtask;
 
+    /**
+     * Creates a tracker for a {@link PendingCheckpoint}.
+     *
+     * @param checkpointId ID of the checkpoint.
+     * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+     * @param props Checkpoint properties of the checkpoint.
+     * @param taskStats Task stats for each involved operator.
+     * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
+     */
+    PendingCheckpointStats(
+            long checkpointId,
+            long triggerTimestamp,
+            CheckpointProperties props,
+            Map<JobVertexID, Integer> taskStats,
+            CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+        this(
+                checkpointId,
+                triggerTimestamp,
+                props,
+                taskStats.values().stream().mapToInt(i -> i).sum(),
+                0,
+                taskStats.entrySet().stream()
+                        .collect(
+                                toConcurrentMap(
+                                        Map.Entry::getKey,
+                                        e -> new TaskStateStats(e.getKey(), e.getValue()))),
+                trackerCallback,
+                0,
+                0,
+                0,
+                null);

Review comment:
       I can not find a matching constructor for this call in this commit/master? Some rebasing issue?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -1051,6 +1051,34 @@ public void acknowledgeCheckpoint(
         }
     }
 
+    @Override
+    public void reportCheckpointMetrics(
+            JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics) {
+        mainThreadExecutor.assertRunningInMainThread();
+
+        final CheckpointCoordinator checkpointCoordinator =
+                executionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            checkpointCoordinator.reportStats(id, attemptId, metrics);
+                        } catch (Throwable t) {
+                            log.warn("Error while processing report checkpoint stats message", t);
+                        }
+                    });
+        } else {
+            String errorMessage =
+                    "Received ReportCheckpointStats message for job {} with no CheckpointCoordinator";
+            if (executionGraph.getState() == JobStatus.RUNNING) {
+                log.error(errorMessage, jobGraph.getJobID());
+            } else {
+                log.debug(errorMessage, jobGraph.getJobID());
+            }
+        }
+    }

Review comment:
       Could we deduplicate those lines (error handling and logging) with `acknowledgeCheckpoint`? They seem to differ only by a lambda function and string messages, which maybe could be easily extracted as parameters?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org