You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:23 UTC
[flink] 03/04: [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 906f3244a3febf3bfe5221290d4cd3ad4746765f
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:55:31 2022 +0100
[FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store
---
.../runtime/checkpoint/CheckpointCoordinator.java | 31 ++++++++++++++++++++--
.../runtime/checkpoint/PendingCheckpoint.java | 18 +------------
.../checkpoint/CheckpointCoordinatorTest.java | 21 +++++++++++++--
.../runtime/checkpoint/PendingCheckpointTest.java | 5 ++--
4 files changed, 51 insertions(+), 24 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5248538..5deee51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1256,11 +1256,39 @@ public class CheckpointCoordinator {
} else {
lastSubsumed = null;
}
+
+ reportCompletedCheckpoint(completedCheckpoint);
} finally {
pendingCheckpoints.remove(checkpointId);
scheduleTriggerRequest();
}
+ cleanupAfterCompletedCheckpoint(
+ pendingCheckpoint, checkpointId, completedCheckpoint, lastSubsumed, props);
+ }
+
+ private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+ CompletedCheckpointStats completedCheckpointStats = completedCheckpoint.getStatistic();
+ if (completedCheckpointStats != null) {
+ LOG.trace(
+ "Checkpoint {} size: {}Kb, duration: {}ms",
+ completedCheckpoint.getCheckpointID(),
+ completedCheckpointStats.getStateSize() == 0
+ ? 0
+ : completedCheckpointStats.getStateSize() / 1024,
+ completedCheckpointStats.getEndToEndDuration());
+ // Finalize the statsCallback and give the completed checkpoint a
+ // callback for discards.
+ statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
+ }
+ }
+
+ private void cleanupAfterCompletedCheckpoint(
+ PendingCheckpoint pendingCheckpoint,
+ long checkpointId,
+ CompletedCheckpoint completedCheckpoint,
+ CompletedCheckpoint lastSubsumed,
+ CheckpointProperties props) {
// remember recent checkpoint id for debugging purposes
rememberRecentCheckpointId(checkpointId);
@@ -1313,8 +1341,7 @@ public class CheckpointCoordinator {
pendingCheckpoint.finalizeCheckpoint(
checkpointsCleaner,
this::scheduleTriggerRequest,
- executor,
- statsTracker);
+ executor);
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
return completedCheckpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 76c6a44..08cd23f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -303,10 +303,7 @@ public class PendingCheckpoint implements Checkpoint {
}
public CompletedCheckpoint finalizeCheckpoint(
- CheckpointsCleaner checkpointsCleaner,
- Runnable postCleanup,
- Executor executor,
- CheckpointStatsTracker statsTracker)
+ CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
throws IOException {
synchronized (lock) {
@@ -342,19 +339,6 @@ public class PendingCheckpoint implements Checkpoint {
finalizedLocation,
toCompletedCheckpointStats(finalizedLocation));
- CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
- if (completedCheckpointStats != null) {
- LOG.trace(
- "Checkpoint {} size: {}Kb, duration: {}ms",
- checkpointId,
- completedCheckpointStats.getStateSize() == 0
- ? 0
- : completedCheckpointStats.getStateSize() / 1024,
- completedCheckpointStats.getEndToEndDuration());
-
- statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
- }
-
onCompletionPromise.complete(completed);
// mark this pending checkpoint as disposed, but do NOT drop the state
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2cfa183..df8d606 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1988,9 +1988,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
-
+ CheckpointStatsTracker statsTracker =
+ new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
// set up the coordinator and validate the initial state
- CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCheckpointCoordinatorConfiguration(
+ CheckpointCoordinatorConfiguration.builder()
+ .setAlignedCheckpointTimeout(Long.MAX_VALUE)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build())
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .setCheckpointStatsTracker(statsTracker)
+ .build();
assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -2083,6 +2094,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertEquals(pending.getCheckpointId(), success.getCheckpointID());
assertEquals(2, success.getOperatorStates().size());
+ AbstractCheckpointStats actualStats =
+ statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
+
+ assertEquals(checkpointId, actualStats.getCheckpointId());
+ assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+
checkpointCoordinator.shutdown();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 294c6fa..799d743 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -217,8 +217,7 @@ public class PendingCheckpointTest {
assertFalse(future.isDone());
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertTrue(pending.areTasksFullyAcknowledged());
- pending.finalizeCheckpoint(
- new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+ pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
assertTrue(future.isDone());
// Finalize (missing ACKs)
@@ -228,7 +227,7 @@ public class PendingCheckpointTest {
assertFalse(future.isDone());
try {
pending.finalizeCheckpoint(
- new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
+ new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected