You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 17:00:32 UTC

[flink] branch master updated (26d7c09 -> ffe353a)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 26d7c09  [FLINK-26407][end-to-end-tests] Increase timeouts for MetricsAvailabilityITCase.
     new 5ce2e06  [FLINK-26049][checkpoint] initialize CheckpointLocation after create PendingCheckpoint
     new 33e3b75  [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager
     new ffe353a  [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  | 113 +++++++++------------
 .../checkpoint/CheckpointFailureManager.java       |  52 ++++++++--
 .../runtime/checkpoint/CheckpointStatsCounts.java  |   9 ++
 .../runtime/checkpoint/CheckpointStatsTracker.java |  15 +++
 .../runtime/checkpoint/PendingCheckpoint.java      |  30 ++----
 .../checkpoint/CheckpointCoordinatorTest.java      |  56 +++++++++-
 .../checkpoint/CheckpointStatsCountsTest.java      |   7 ++
 .../checkpoint/CheckpointStatsTrackerTest.java     |   9 ++
 .../runtime/checkpoint/PendingCheckpointTest.java  |  31 +++---
 9 files changed, 211 insertions(+), 111 deletions(-)

[flink] 03/03: [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ffe353a305e677340d344d3fa45994536757e323
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 18:12:34 2022 +0800

    [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint
---
 .../checkpoint/CheckpointFailureManager.java       |  2 +
 .../runtime/checkpoint/CheckpointStatsCounts.java  |  9 +++++
 .../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++
 .../checkpoint/CheckpointCoordinatorTest.java      | 46 ++++++++++++++++++++++
 .../checkpoint/CheckpointStatsCountsTest.java      |  7 ++++
 .../checkpoint/CheckpointStatsTrackerTest.java     |  9 +++++
 6 files changed, 88 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index df429a4..0205adc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -132,6 +132,8 @@ public class CheckpointFailureManager {
             long failureTimestamp = System.currentTimeMillis();
             statsTracker.reportFailedCheckpoint(
                     pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+        } else {
+            statsTracker.reportFailedCheckpointsWithoutInProgress();
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index 8d06a1b..7265bb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable {
     }
 
     /**
+     * Increments the number of failed checkpoints without in progress checkpoint. For example, it
+     * should be callback when triggering checkpoint failure before creating PendingCheckpoint.
+     */
+    void incrementFailedCheckpointsWithoutInProgress() {
+        numFailedCheckpoints++;
+        numTotalCheckpoints++;
+    }
+
+    /**
      * Creates a snapshot of the current state.
      *
      * @return Snapshot of the current state.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f10a668..7971493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -232,6 +232,21 @@ public class CheckpointStatsTracker {
         }
     }
 
+    /**
+     * Callback when a checkpoint failure without in progress checkpoint. For example, it should be
+     * callback when triggering checkpoint failure before creating PendingCheckpoint.
+     */
+    public void reportFailedCheckpointsWithoutInProgress() {
+        statsReadWriteLock.lock();
+        try {
+            counts.incrementFailedCheckpointsWithoutInProgress();
+
+            dirty = true;
+        } finally {
+            statsReadWriteLock.unlock();
+        }
+    }
+
     public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
         statsReadWriteLock.lock();
         try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b493ef4..30af82f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -129,6 +129,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -3113,6 +3114,43 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, PERIODIC_SCHEDULER_SHUTDOWN);
     }
 
+    /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
+    @Test
+    public void testTriggerCheckpointWithCounterIOException() throws Exception {
+        // given: Checkpoint coordinator which fails on getCheckpointId.
+        IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
+        TestFailJobCallback failureCallback = new TestFailJobCallback();
+
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
+
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointIDCounter(testingCounter)
+                        .setFailureManager(new CheckpointFailureManager(0, failureCallback))
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
+                        .build();
+        testingCounter.setOwner(checkpointCoordinator);
+
+        // when: The checkpoint is triggered.
+        testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
+
+        // then: Failure manager should fail the job.
+        assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
+        CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
+        assertEquals(0, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(1, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(0, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        // then: The PendingCheckpoint shouldn't be created.
+        assertNull(statsTracker.getPendingCheckpointStats(1));
+    }
+
     private void testTriggerCheckpoint(
             CheckpointCoordinator checkpointCoordinator,
             CheckpointFailureReason expectedFailureReason)
@@ -3848,6 +3886,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         }
     }
 
+    private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner {
+        @Override
+        public long getAndIncrement() throws Exception {
+            checkNotNull(owner);
+            throw new IOException("disk is error!");
+        }
+    }
+
     private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage {
         @Override
         public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index c3ffd6d..5d60810 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest {
         assertEquals(0, counts.getNumberOfInProgressCheckpoints());
         assertEquals(1, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        counts.incrementFailedCheckpointsWithoutInProgress();
+        assertEquals(1, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(3, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(1, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 8e7b0bd..14b3a94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -172,6 +172,15 @@ public class CheckpointStatsTrackerTest {
         assertEquals(2, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
 
+        tracker.reportFailedCheckpointsWithoutInProgress();
+
+        CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
+        counts = snapshot1.getCounts();
+        assertEquals(5, counts.getTotalNumberOfCheckpoints());
+        assertEquals(1, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(2, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
+
         // Summary stats
         CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats();
         assertEquals(2, summary.getStateSizeStats().getCount());

[flink] 01/03: [FLINK-26049][checkpoint] initialize CheckpointLocation after create PendingCheckpoint

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ce2e062cdb1c7dfd7e641cd7f10de04440a0583
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 11:35:09 2022 +0800

    [FLINK-26049][checkpoint] initialize CheckpointLocation after create PendingCheckpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 86 +++++++++++-----------
 .../runtime/checkpoint/PendingCheckpoint.java      | 16 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 ++-
 .../runtime/checkpoint/PendingCheckpointTest.java  | 24 +++---
 4 files changed, 75 insertions(+), 61 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 475effc..4efff80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -545,14 +545,12 @@ public class CheckpointCoordinator {
                             .thenApplyAsync(
                                     plan -> {
                                         try {
-                                            CheckpointIdAndStorageLocation
-                                                    checkpointIdAndStorageLocation =
-                                                            initializeCheckpoint(
-                                                                    request.props,
-                                                                    request.externalSavepointLocation,
-                                                                    initializeBaseLocations);
-                                            return new Tuple2<>(
-                                                    plan, checkpointIdAndStorageLocation);
+                                            // this must happen outside the coordinator-wide lock,
+                                            // because it communicates with external services
+                                            // (in HA mode) and may block for a while.
+                                            long checkpointID =
+                                                    checkpointIdCounter.getAndIncrement();
+                                            return new Tuple2<>(plan, checkpointID);
                                         } catch (Throwable e) {
                                             throw new CompletionException(e);
                                         }
@@ -565,20 +563,42 @@ public class CheckpointCoordinator {
                                                     request.props,
                                                     checkpointInfo.f0,
                                                     request.isPeriodic,
-                                                    checkpointInfo.f1.checkpointId,
-                                                    checkpointInfo.f1.checkpointStorageLocation,
+                                                    checkpointInfo.f1,
                                                     request.getOnCompletionFuture()),
                                     timer);
 
             final CompletableFuture<?> coordinatorCheckpointsComplete =
-                    pendingCheckpointCompletableFuture.thenComposeAsync(
-                            (pendingCheckpoint) ->
-                                    OperatorCoordinatorCheckpoints
-                                            .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
-                                                    coordinatorsToCheckpoint,
-                                                    pendingCheckpoint,
-                                                    timer),
-                            timer);
+                    pendingCheckpointCompletableFuture
+                            .thenApplyAsync(
+                                    pendingCheckpoint -> {
+                                        try {
+                                            CheckpointStorageLocation checkpointStorageLocation =
+                                                    initializeCheckpointLocation(
+                                                            pendingCheckpoint.getCheckpointID(),
+                                                            request.props,
+                                                            request.externalSavepointLocation,
+                                                            initializeBaseLocations);
+                                            return Tuple2.of(
+                                                    pendingCheckpoint, checkpointStorageLocation);
+                                        } catch (Throwable e) {
+                                            throw new CompletionException(e);
+                                        }
+                                    },
+                                    executor)
+                            .thenComposeAsync(
+                                    (checkpointInfo) -> {
+                                        PendingCheckpoint pendingCheckpoint = checkpointInfo.f0;
+                                        synchronized (lock) {
+                                            pendingCheckpoint.setCheckpointTargetLocation(
+                                                    checkpointInfo.f1);
+                                        }
+                                        return OperatorCoordinatorCheckpoints
+                                                .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+                                                        coordinatorsToCheckpoint,
+                                                        pendingCheckpoint,
+                                                        timer);
+                                    },
+                                    timer);
 
             // We have to take the snapshot of the master hooks after the coordinator checkpoints
             // has completed.
@@ -726,24 +746,20 @@ public class CheckpointCoordinator {
     }
 
     /**
-     * Initialize the checkpoint trigger asynchronously. It will expected to be executed in io
+     * Initialize the checkpoint location asynchronously. It will expected to be executed in io
      * thread due to it might be time-consuming.
      *
+     * @param checkpointID checkpoint id
      * @param props checkpoint properties
      * @param externalSavepointLocation the external savepoint location, it might be null
-     * @return the initialized result, checkpoint id and checkpoint location
+     * @return the checkpoint location
      */
-    private CheckpointIdAndStorageLocation initializeCheckpoint(
+    private CheckpointStorageLocation initializeCheckpointLocation(
+            long checkpointID,
             CheckpointProperties props,
             @Nullable String externalSavepointLocation,
             boolean initializeBaseLocations)
             throws Exception {
-
-        // this must happen outside the coordinator-wide lock, because it
-        // communicates
-        // with external services (in HA mode) and may block for a while.
-        long checkpointID = checkpointIdCounter.getAndIncrement();
-
         final CheckpointStorageLocation checkpointStorageLocation;
         if (props.isSavepoint()) {
             checkpointStorageLocation =
@@ -757,7 +773,7 @@ public class CheckpointCoordinator {
                     checkpointStorageView.initializeLocationForCheckpoint(checkpointID);
         }
 
-        return new CheckpointIdAndStorageLocation(checkpointID, checkpointStorageLocation);
+        return checkpointStorageLocation;
     }
 
     private PendingCheckpoint createPendingCheckpoint(
@@ -766,7 +782,6 @@ public class CheckpointCoordinator {
             CheckpointPlan checkpointPlan,
             boolean isPeriodic,
             long checkpointID,
-            CheckpointStorageLocation checkpointStorageLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
 
         synchronized (lock) {
@@ -791,7 +806,6 @@ public class CheckpointCoordinator {
                         OperatorInfo.getIds(coordinatorsToCheckpoint),
                         masterHooks.keySet(),
                         props,
-                        checkpointStorageLocation,
                         onCompletionPromise,
                         pendingCheckpointStats);
 
@@ -2161,18 +2175,6 @@ public class CheckpointCoordinator {
         }
     }
 
-    private static class CheckpointIdAndStorageLocation {
-        private final long checkpointId;
-        private final CheckpointStorageLocation checkpointStorageLocation;
-
-        CheckpointIdAndStorageLocation(
-                long checkpointId, CheckpointStorageLocation checkpointStorageLocation) {
-
-            this.checkpointId = checkpointId;
-            this.checkpointStorageLocation = checkNotNull(checkpointStorageLocation);
-        }
-    }
-
     static class CheckpointTriggerRequest {
         final long timestamp;
         final CheckpointProperties props;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 08cd23f..9229024 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -107,14 +107,14 @@ public class PendingCheckpoint implements Checkpoint {
     /** The checkpoint properties. */
     private final CheckpointProperties props;
 
-    /** Target storage location to persist the checkpoint metadata to. */
-    private final CheckpointStorageLocation targetLocation;
-
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
     @Nullable private final PendingCheckpointStats pendingCheckpointStats;
 
+    /** Target storage location to persist the checkpoint metadata to. */
+    @Nullable private CheckpointStorageLocation targetLocation;
+
     private int numAcknowledgedTasks;
 
     private boolean disposed;
@@ -135,7 +135,6 @@ public class PendingCheckpoint implements Checkpoint {
             Collection<OperatorID> operatorCoordinatorsToConfirm,
             Collection<String> masterStateIdentifiers,
             CheckpointProperties props,
-            CheckpointStorageLocation targetLocation,
             CompletableFuture<CompletedCheckpoint> onCompletionPromise,
             @Nullable PendingCheckpointStats pendingCheckpointStats) {
         checkArgument(
@@ -153,7 +152,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
 
         this.props = checkNotNull(props);
-        this.targetLocation = checkNotNull(targetLocation);
 
         this.operatorStates = new HashMap<>();
         this.masterStates = new ArrayList<>(masterStateIdentifiers.size());
@@ -191,6 +189,10 @@ public class PendingCheckpoint implements Checkpoint {
         return checkpointId;
     }
 
+    public void setCheckpointTargetLocation(CheckpointStorageLocation targetLocation) {
+        this.targetLocation = targetLocation;
+    }
+
     public CheckpointStorageLocation getCheckpointStorageLocation() {
         return targetLocation;
     }
@@ -645,7 +647,9 @@ public class PendingCheckpoint implements Checkpoint {
             // unregistered shared states are still considered private at this point.
             try {
                 StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-                targetLocation.disposeOnFailure();
+                if (targetLocation != null) {
+                    targetLocation.disposeOnFailure();
+                }
             } catch (Throwable t) {
                 LOG.warn(
                         "Could not properly dispose the private states in the pending checkpoint {} of job {}.",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3c09945..b493ef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -727,11 +727,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that do not trigger checkpoint when IOException occurred. */
     @Test
-    public void testTriggerCheckpointAfterIOException() throws Exception {
-        // given: Checkpoint coordinator which fails on initializeLocationForCheckpoint.
+    public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
+        // given: Checkpoint coordinator which fails on initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(statsTracker)
                         .setFailureManager(new CheckpointFailureManager(0, failureCallback))
                         .setCheckpointStorage(new IOExceptionCheckpointStorage())
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -741,6 +744,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // then: Failure manager should fail the job.
         assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: Should created PendingCheckpoint
+        assertNotNull(statsTracker.getPendingCheckpointStats(1));
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 799d743..a0b07b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -634,17 +634,19 @@ public class PendingCheckpointTest {
                         1024,
                         4096);
 
-        return new PendingCheckpoint(
-                new JobID(),
-                0,
-                1,
-                checkpointPlan,
-                operatorCoordinators,
-                masterStateIdentifiers,
-                props,
-                location,
-                new CompletableFuture<>(),
-                null);
+        PendingCheckpoint pendingCheckpoint =
+                new PendingCheckpoint(
+                        new JobID(),
+                        0,
+                        1,
+                        checkpointPlan,
+                        operatorCoordinators,
+                        masterStateIdentifiers,
+                        props,
+                        new CompletableFuture<>(),
+                        null);
+        pendingCheckpoint.setCheckpointTargetLocation(location);
+        return pendingCheckpoint;
     }
 
     @SuppressWarnings("unchecked")

[flink] 02/03: [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33e3b75bb74dad5f75ad0bea788906e24c3df4e8
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 17:45:12 2022 +0800

    [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 27 +++---------
 .../checkpoint/CheckpointFailureManager.java       | 50 +++++++++++++++++++---
 .../runtime/checkpoint/PendingCheckpoint.java      | 14 ------
 .../runtime/checkpoint/PendingCheckpointTest.java  |  7 ---
 4 files changed, 48 insertions(+), 50 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4efff80..2efc034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -79,7 +79,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
@@ -177,9 +176,6 @@ public class CheckpointCoordinator {
     /** Actor that receives status updates from the execution graph this coordinator works for. */
     private JobStatusListener jobStatusListener;
 
-    /** The number of consecutive failed trigger attempts. */
-    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
-
     /** A handle to the current periodic trigger, to cancel it when necessary. */
     private ScheduledFuture<?> currentPeriodicTrigger;
 
@@ -886,7 +882,6 @@ public class CheckpointCoordinator {
     /** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */
     private void onTriggerSuccess() {
         isTriggering = false;
-        numUnsuccessfulCheckpointsTriggers.set(0);
         executeQueuedRequest();
     }
 
@@ -935,25 +930,12 @@ public class CheckpointCoordinator {
                             CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
 
             if (checkpoint != null && !checkpoint.isDisposed()) {
-                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-                LOG.warn(
-                        "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
-                        checkpoint.getCheckpointId(),
-                        job,
-                        numUnsuccessful,
-                        throwable);
-
                 synchronized (lock) {
                     abortPendingCheckpoint(checkpoint, cause);
                 }
             } else {
-                LOG.info(
-                        "Failed to trigger checkpoint for job {} because {}",
-                        job,
-                        throwable.getMessage());
-
                 failureManager.handleCheckpointException(
-                        checkpoint, checkpointProperties, cause, null);
+                        checkpoint, checkpointProperties, cause, null, job, null, statsTracker);
             }
         } finally {
             isTriggering = false;
@@ -1917,8 +1899,6 @@ public class CheckpointCoordinator {
             final CheckpointException reason =
                     new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND);
             abortPendingAndQueuedCheckpoints(reason);
-
-            numUnsuccessfulCheckpointsTriggers.set(0);
         }
     }
 
@@ -2098,7 +2078,10 @@ public class CheckpointCoordinator {
                         pendingCheckpoint,
                         pendingCheckpoint.getProps(),
                         exception,
-                        executionAttemptID);
+                        executionAttemptID,
+                        job,
+                        getStatsCallback(pendingCheckpoint),
+                        statsTracker);
             } finally {
                 sendAbortedMessages(
                         pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8ccccc6..df429a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
@@ -34,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** The checkpoint failure manager which centralized manage checkpoint failure processing logic. */
 public class CheckpointFailureManager {
 
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class);
+
     public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
     public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE =
             "Exceeded checkpoint tolerable failure threshold.";
@@ -80,25 +86,55 @@ public class CheckpointFailureManager {
      *     strategy can be used.
      * @param exception the checkpoint exception.
      * @param executionAttemptID the execution attempt id, as a safe guard.
+     * @param job the JobID.
+     * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param statsTracker the tracker for checkpoint statistics.
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
             CheckpointProperties checkpointProperties,
             CheckpointException exception,
-            @Nullable ExecutionAttemptID executionAttemptID) {
+            @Nullable ExecutionAttemptID executionAttemptID,
+            JobID job,
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker) {
+        long checkpointId =
+                pendingCheckpoint == null
+                        ? UNKNOWN_CHECKPOINT_ID
+                        : pendingCheckpoint.getCheckpointID();
+        updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
+
+        LOG.warn(
+                "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+                checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
+                job,
+                continuousFailureCounter.get(),
+                exception);
         if (isJobManagerFailure(exception, executionAttemptID)) {
-            handleJobLevelCheckpointException(
-                    checkpointProperties,
-                    exception,
-                    pendingCheckpoint == null
-                            ? UNKNOWN_CHECKPOINT_ID
-                            : pendingCheckpoint.getCheckpointID());
+            handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
         } else {
             handleTaskLevelCheckpointException(
                     checkNotNull(pendingCheckpoint), exception, checkNotNull(executionAttemptID));
         }
     }
 
+    /**
+     * Updating checkpoint statistics after checkpoint failed.
+     *
+     * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param exception the checkpoint exception.
+     */
+    private void updateStatsAfterCheckpointFailed(
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker,
+            CheckpointException exception) {
+        if (pendingCheckpointStats != null) {
+            long failureTimestamp = System.currentTimeMillis();
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+        }
+    }
+
     private boolean isJobManagerFailure(
             CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) {
         // TODO: Try to get rid of checking nullability of executionAttemptID because false value of
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9229024..b4bd8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -544,7 +544,6 @@ public class PendingCheckpoint implements Checkpoint {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(statsTracker, failureCause);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -596,19 +595,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    /**
-     * Reports a failed checkpoint with the given optional cause.
-     *
-     * @param cause The failure cause or <code>null</code>.
-     */
-    private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
-        // to prevent null-pointers from concurrent modification, copy reference onto stack
-        if (pendingCheckpointStats != null) {
-            statsTracker.reportFailedCheckpoint(
-                    pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a0b07b0..392bd0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -661,13 +661,6 @@ public class PendingCheckpointTest {
     }
 
     private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
-        abort(checkpoint, reason, null);
-    }
-
-    private void abort(
-            PendingCheckpoint checkpoint,
-            CheckpointFailureReason reason,
-            PendingCheckpointStats statsCallback) {
         checkpoint.abort(
                 reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
     }