You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:24 UTC
[flink] 04/04: [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fbfdb0e468356fe71826eb6b185ecda9bc8b1de3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:59:58 2022 +0100
[FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails
---
.../runtime/checkpoint/CheckpointCoordinator.java | 15 ++++++++---
.../CheckpointCoordinatorFailureTest.java | 31 ++++++++++++++++++++--
.../checkpoint/CheckpointCoordinatorTest.java | 10 ++++++-
3 files changed, 50 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5deee51..475effc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1339,9 +1339,7 @@ public class CheckpointCoordinator {
try {
final CompletedCheckpoint completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint(
- checkpointsCleaner,
- this::scheduleTriggerRequest,
- executor);
+ checkpointsCleaner, this::scheduleTriggerRequest, executor);
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
return completedCheckpoint;
@@ -1401,6 +1399,7 @@ public class CheckpointCoordinator {
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
}
+ reportFailedCheckpoint(checkpointId, exception);
sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp());
throw new CheckpointException(
"Could not complete the pending checkpoint " + checkpointId + '.',
@@ -1409,6 +1408,16 @@ public class CheckpointCoordinator {
}
}
+ private void reportFailedCheckpoint(long checkpointId, Exception exception) {
+ PendingCheckpointStats pendingCheckpointStats =
+ statsTracker.getPendingCheckpointStats(checkpointId);
+ if (pendingCheckpointStats != null) {
+ statsTracker.reportFailedCheckpoint(
+ pendingCheckpointStats.toFailedCheckpoint(
+ System.currentTimeMillis(), exception));
+ }
+ }
+
void scheduleTriggerRequest() {
synchronized (lock) {
if (isShutdown()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index c02ee16..3674515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
@@ -49,6 +50,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -204,6 +206,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
final CompletedCheckpointStore completedCheckpointStore =
new FailingCompletedCheckpointStore(failure);
+ CheckpointStatsTracker statsTracker =
+ new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
final CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -226,14 +230,27 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
})
.setCompletedCheckpointStore(completedCheckpointStore)
.setTimer(manuallyTriggeredScheduledExecutor)
+ .setCheckpointStatsTracker(statsTracker)
.build();
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
-
+ CheckpointMetrics expectedReportedMetrics =
+ new CheckpointMetricsBuilder()
+ .setTotalBytesPersisted(18)
+ .setBytesPersistedOfThisCheckpoint(18)
+ .setBytesProcessedDuringAlignment(19)
+ .setAsyncDurationMillis(20)
+ .setAlignmentDurationNanos(123 * 1_000_000)
+ .setCheckpointStartDelayNanos(567 * 1_000_000)
+ .build();
try {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
- graph.getJobID(), attemptId, checkpointIDCounter.getLast()),
+ graph.getJobID(),
+ attemptId,
+ checkpointIDCounter.getLast(),
+ expectedReportedMetrics,
+ new TaskStateSnapshot()),
"unknown location");
fail("CheckpointException should have been thrown.");
} catch (CheckpointException e) {
@@ -242,6 +259,16 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
}
+ AbstractCheckpointStats actualStats =
+ statsTracker
+ .createSnapshot()
+ .getHistory()
+ .getCheckpointById(checkpointIDCounter.getLast());
+
+ assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
+ assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+ assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
+
assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index df8d606..3c09945 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,8 +299,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
AbstractCheckpointStats actual) {
assertEquals(checkpointId, actual.getCheckpointId());
assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
- assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+ assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
+ }
+
+ public static void assertStatsMetrics(
+ JobVertexID jobVertexID,
+ int subtasIdx,
+ CheckpointMetrics expected,
+ AbstractCheckpointStats actual) {
+ assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
SubtaskStateStats taskStats =
actual.getAllTaskStateStats().stream()
.filter(s -> s.getJobVertexId().equals(jobVertexID))