You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 17:00:35 UTC
[flink] 03/03: [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ffe353a305e677340d344d3fa45994536757e323
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 18:12:34 2022 +0800
[FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint
---
.../checkpoint/CheckpointFailureManager.java | 2 +
.../runtime/checkpoint/CheckpointStatsCounts.java | 9 +++++
.../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++
.../checkpoint/CheckpointCoordinatorTest.java | 46 ++++++++++++++++++++++
.../checkpoint/CheckpointStatsCountsTest.java | 7 ++++
.../checkpoint/CheckpointStatsTrackerTest.java | 9 +++++
6 files changed, 88 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index df429a4..0205adc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -132,6 +132,8 @@ public class CheckpointFailureManager {
long failureTimestamp = System.currentTimeMillis();
statsTracker.reportFailedCheckpoint(
pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+ } else {
+ statsTracker.reportFailedCheckpointsWithoutInProgress();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index 8d06a1b..7265bb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable {
}
/**
+ * Increments the number of failed checkpoints without in progress checkpoint. For example, it
+ * should be callback when triggering checkpoint failure before creating PendingCheckpoint.
+ */
+ void incrementFailedCheckpointsWithoutInProgress() {
+ numFailedCheckpoints++;
+ numTotalCheckpoints++;
+ }
+
+ /**
* Creates a snapshot of the current state.
*
* @return Snapshot of the current state.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f10a668..7971493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -232,6 +232,21 @@ public class CheckpointStatsTracker {
}
}
+ /**
+ * Callback when a checkpoint failure without in progress checkpoint. For example, it should be
+ * callback when triggering checkpoint failure before creating PendingCheckpoint.
+ */
+ public void reportFailedCheckpointsWithoutInProgress() {
+ statsReadWriteLock.lock();
+ try {
+ counts.incrementFailedCheckpointsWithoutInProgress();
+
+ dirty = true;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
statsReadWriteLock.lock();
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b493ef4..30af82f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -129,6 +129,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
@@ -3113,6 +3114,43 @@ public class CheckpointCoordinatorTest extends TestLogger {
testTriggerCheckpoint(checkpointCoordinator, PERIODIC_SCHEDULER_SHUTDOWN);
}
+ /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
+ @Test
+ public void testTriggerCheckpointWithCounterIOException() throws Exception {
+ // given: Checkpoint coordinator which fails on getCheckpointId.
+ IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
+ TestFailJobCallback failureCallback = new TestFailJobCallback();
+
+ CheckpointStatsTracker statsTracker =
+ new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
+
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointIDCounter(testingCounter)
+ .setFailureManager(new CheckpointFailureManager(0, failureCallback))
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .setCheckpointStatsTracker(statsTracker)
+ .build();
+ testingCounter.setOwner(checkpointCoordinator);
+
+ // when: The checkpoint is triggered.
+ testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
+
+ // then: Failure manager should fail the job.
+ assertEquals(1, failureCallback.getInvokeCounter());
+
+ // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
+ CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
+ assertEquals(0, counts.getNumberOfRestoredCheckpoints());
+ assertEquals(1, counts.getTotalNumberOfCheckpoints());
+ assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+ assertEquals(0, counts.getNumberOfCompletedCheckpoints());
+ assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+ // then: The PendingCheckpoint shouldn't be created.
+ assertNull(statsTracker.getPendingCheckpointStats(1));
+ }
+
private void testTriggerCheckpoint(
CheckpointCoordinator checkpointCoordinator,
CheckpointFailureReason expectedFailureReason)
@@ -3848,6 +3886,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
}
+ private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner {
+ @Override
+ public long getAndIncrement() throws Exception {
+ checkNotNull(owner);
+ throw new IOException("disk is error!");
+ }
+ }
+
private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage {
@Override
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index c3ffd6d..5d60810 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest {
assertEquals(0, counts.getNumberOfInProgressCheckpoints());
assertEquals(1, counts.getNumberOfCompletedCheckpoints());
assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+ counts.incrementFailedCheckpointsWithoutInProgress();
+ assertEquals(1, counts.getNumberOfRestoredCheckpoints());
+ assertEquals(3, counts.getTotalNumberOfCheckpoints());
+ assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+ assertEquals(1, counts.getNumberOfCompletedCheckpoints());
+ assertEquals(2, counts.getNumberOfFailedCheckpoints());
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 8e7b0bd..14b3a94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -172,6 +172,15 @@ public class CheckpointStatsTrackerTest {
assertEquals(2, counts.getNumberOfCompletedCheckpoints());
assertEquals(1, counts.getNumberOfFailedCheckpoints());
+ tracker.reportFailedCheckpointsWithoutInProgress();
+
+ CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
+ counts = snapshot1.getCounts();
+ assertEquals(5, counts.getTotalNumberOfCheckpoints());
+ assertEquals(1, counts.getNumberOfInProgressCheckpoints());
+ assertEquals(2, counts.getNumberOfCompletedCheckpoints());
+ assertEquals(2, counts.getNumberOfFailedCheckpoints());
+
// Summary stats
CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats();
assertEquals(2, summary.getStateSizeStats().getCount());