You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 17:00:35 UTC

[flink] 03/03: [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ffe353a305e677340d344d3fa45994536757e323
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 18:12:34 2022 +0800

    [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint
---
 .../checkpoint/CheckpointFailureManager.java       |  2 +
 .../runtime/checkpoint/CheckpointStatsCounts.java  |  9 +++++
 .../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++
 .../checkpoint/CheckpointCoordinatorTest.java      | 46 ++++++++++++++++++++++
 .../checkpoint/CheckpointStatsCountsTest.java      |  7 ++++
 .../checkpoint/CheckpointStatsTrackerTest.java     |  9 +++++
 6 files changed, 88 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index df429a4..0205adc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -132,6 +132,8 @@ public class CheckpointFailureManager {
             long failureTimestamp = System.currentTimeMillis();
             statsTracker.reportFailedCheckpoint(
                     pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+        } else {
+            statsTracker.reportFailedCheckpointsWithoutInProgress();
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index 8d06a1b..7265bb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable {
     }
 
     /**
+     * Increments the number of failed checkpoints without in progress checkpoint. For example, it
+     * should be callback when triggering checkpoint failure before creating PendingCheckpoint.
+     */
+    void incrementFailedCheckpointsWithoutInProgress() {
+        numFailedCheckpoints++;
+        numTotalCheckpoints++;
+    }
+
+    /**
      * Creates a snapshot of the current state.
      *
      * @return Snapshot of the current state.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f10a668..7971493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -232,6 +232,21 @@ public class CheckpointStatsTracker {
         }
     }
 
+    /**
+     * Callback when a checkpoint failure without in progress checkpoint. For example, it should be
+     * callback when triggering checkpoint failure before creating PendingCheckpoint.
+     */
+    public void reportFailedCheckpointsWithoutInProgress() {
+        statsReadWriteLock.lock();
+        try {
+            counts.incrementFailedCheckpointsWithoutInProgress();
+
+            dirty = true;
+        } finally {
+            statsReadWriteLock.unlock();
+        }
+    }
+
     public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
         statsReadWriteLock.lock();
         try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b493ef4..30af82f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -129,6 +129,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -3113,6 +3114,43 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, PERIODIC_SCHEDULER_SHUTDOWN);
     }
 
+    /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
+    @Test
+    public void testTriggerCheckpointWithCounterIOException() throws Exception {
+        // given: Checkpoint coordinator which fails on getCheckpointId.
+        IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
+        TestFailJobCallback failureCallback = new TestFailJobCallback();
+
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
+
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointIDCounter(testingCounter)
+                        .setFailureManager(new CheckpointFailureManager(0, failureCallback))
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
+                        .build();
+        testingCounter.setOwner(checkpointCoordinator);
+
+        // when: The checkpoint is triggered.
+        testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
+
+        // then: Failure manager should fail the job.
+        assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
+        CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
+        assertEquals(0, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(1, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(0, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        // then: The PendingCheckpoint shouldn't be created.
+        assertNull(statsTracker.getPendingCheckpointStats(1));
+    }
+
     private void testTriggerCheckpoint(
             CheckpointCoordinator checkpointCoordinator,
             CheckpointFailureReason expectedFailureReason)
@@ -3848,6 +3886,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         }
     }
 
+    private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner {
+        @Override
+        public long getAndIncrement() throws Exception {
+            checkNotNull(owner);
+            throw new IOException("disk is error!");
+        }
+    }
+
     private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage {
         @Override
         public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index c3ffd6d..5d60810 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest {
         assertEquals(0, counts.getNumberOfInProgressCheckpoints());
         assertEquals(1, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        counts.incrementFailedCheckpointsWithoutInProgress();
+        assertEquals(1, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(3, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(1, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 8e7b0bd..14b3a94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -172,6 +172,15 @@ public class CheckpointStatsTrackerTest {
         assertEquals(2, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
 
+        tracker.reportFailedCheckpointsWithoutInProgress();
+
+        CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
+        counts = snapshot1.getCounts();
+        assertEquals(5, counts.getTotalNumberOfCheckpoints());
+        assertEquals(1, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(2, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
+
         // Summary stats
         CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats();
         assertEquals(2, summary.getStateSizeStats().getCount());