You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:24 UTC

[flink] 04/04: [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fbfdb0e468356fe71826eb6b185ecda9bc8b1de3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:59:58 2022 +0100

    [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 15 ++++++++---
 .../CheckpointCoordinatorFailureTest.java          | 31 ++++++++++++++++++++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 ++++++-
 3 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5deee51..475effc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1339,9 +1339,7 @@ public class CheckpointCoordinator {
         try {
             final CompletedCheckpoint completedCheckpoint =
                     pendingCheckpoint.finalizeCheckpoint(
-                            checkpointsCleaner,
-                            this::scheduleTriggerRequest,
-                            executor);
+                            checkpointsCleaner, this::scheduleTriggerRequest, executor);
 
             failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
@@ -1401,6 +1399,7 @@ public class CheckpointCoordinator {
                 checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
             }
 
+            reportFailedCheckpoint(checkpointId, exception);
             sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp());
             throw new CheckpointException(
                     "Could not complete the pending checkpoint " + checkpointId + '.',
@@ -1409,6 +1408,16 @@ public class CheckpointCoordinator {
         }
     }
 
+    private void reportFailedCheckpoint(long checkpointId, Exception exception) {
+        PendingCheckpointStats pendingCheckpointStats =
+                statsTracker.getPendingCheckpointStats(checkpointId);
+        if (pendingCheckpointStats != null) {
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(
+                            System.currentTimeMillis(), exception));
+        }
+    }
+
     void scheduleTriggerRequest() {
         synchronized (lock) {
             if (isShutdown()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index c02ee16..3674515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
@@ -49,6 +50,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -204,6 +206,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 new FailingCompletedCheckpointStore(failure);
 
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         final AtomicInteger cleanupCallCount = new AtomicInteger(0);
         final CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -226,14 +230,27 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                                 })
                         .setCompletedCheckpointStore(completedCheckpointStore)
                         .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
                         .build();
         checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-
+        CheckpointMetrics expectedReportedMetrics =
+                new CheckpointMetricsBuilder()
+                        .setTotalBytesPersisted(18)
+                        .setBytesPersistedOfThisCheckpoint(18)
+                        .setBytesProcessedDuringAlignment(19)
+                        .setAsyncDurationMillis(20)
+                        .setAlignmentDurationNanos(123 * 1_000_000)
+                        .setCheckpointStartDelayNanos(567 * 1_000_000)
+                        .build();
         try {
             checkpointCoordinator.receiveAcknowledgeMessage(
                     new AcknowledgeCheckpoint(
-                            graph.getJobID(), attemptId, checkpointIDCounter.getLast()),
+                            graph.getJobID(),
+                            attemptId,
+                            checkpointIDCounter.getLast(),
+                            expectedReportedMetrics,
+                            new TaskStateSnapshot()),
                     "unknown location");
             fail("CheckpointException should have been thrown.");
         } catch (CheckpointException e) {
@@ -242,6 +259,16 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                     is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
         }
 
+        AbstractCheckpointStats actualStats =
+                statsTracker
+                        .createSnapshot()
+                        .getHistory()
+                        .getCheckpointById(checkpointIDCounter.getLast());
+
+        assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
+        assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+        assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
+
         assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index df8d606..3c09945 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,8 +299,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
             AbstractCheckpointStats actual) {
         assertEquals(checkpointId, actual.getCheckpointId());
         assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
-        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
         assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+        assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
+    }
+
+    public static void assertStatsMetrics(
+            JobVertexID jobVertexID,
+            int subtasIdx,
+            CheckpointMetrics expected,
+            AbstractCheckpointStats actual) {
+        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
         SubtaskStateStats taskStats =
                 actual.getAllTaskStateStats().stream()
                         .filter(s -> s.getJobVertexId().equals(jobVertexID))