You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/27 15:12:13 UTC
[flink] branch release-1.11 updated: [FLINK-17351] Increase
`continuousFailureCounter` in `CheckpointFailureManager` for
CHECKPOINT_EXPIRED
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f964141 [FLINK-17351] Increase `continuousFailureCounter` in `CheckpointFailureManager` for CHECKPOINT_EXPIRED
f964141 is described below
commit f9641411ca72ca46d74d769dc6d203fa8bb37f60
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Wed May 20 14:41:43 2020 +0800
[FLINK-17351] Increase `continuousFailureCounter` in `CheckpointFailureManager` for CHECKPOINT_EXPIRED
Before this PR, `CHECKPOINT_EXPIRED` is not counted in `continuousFailureCounter`. Hence,
if the failure of checkpointing is detected after checkpoint times out, the failure gets ignored since
the `PendingCheckpoint` has already been discarded, leading the job unable to restart automatically in theory
unless something else fails.
This PR counts `CHECKPOINT_EXPIRED` in `continuousFailureCounter`.
---
.../checkpoint/CheckpointFailureManager.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 60 +++++++++++++++++-----
.../checkpoint/CheckpointFailureManagerTest.java | 2 +-
3 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 0dc655b..fbf5d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -117,7 +117,6 @@ public class CheckpointFailureManager {
case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
case EXCEPTION:
- case CHECKPOINT_EXPIRED:
case TASK_FAILURE:
case TASK_CHECKPOINT_FAILURE:
case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
@@ -127,6 +126,7 @@ public class CheckpointFailureManager {
break;
case CHECKPOINT_DECLINED:
+ case CHECKPOINT_EXPIRED:
//we should make sure one checkpoint only be counted once
if (countedCheckpointIds.add(checkpointId)) {
continuousFailureCounter.incrementAndGet();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index e799836..82f5889 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionJobVertex;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -201,7 +202,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
+ public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
final JobID jid = new JobID();
// create some mock Execution vertices that receive the checkpoint trigger messages
@@ -212,19 +213,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
final String errorMsg = "Exceeded checkpoint failure tolerance number!";
- CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(
- 0,
- new CheckpointFailureManager.FailJobCallback() {
- @Override
- public void failJob(Throwable cause) {
- throw new RuntimeException(errorMsg);
- }
-
- @Override
- public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
- throw new RuntimeException(errorMsg);
- }
- });
+ CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
// set up the coordinator
CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
@@ -262,6 +251,33 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
}
+ @Test
+ public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+ ExecutionVertex vertex2 = mockExecutionVertex(new ExecutionAttemptID());
+
+ final String errorMsg = "Exceeded checkpoint failure tolerance number!";
+ CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
+ CheckpointCoordinator coord = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
+
+ try {
+ coord.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ coord.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
+
+ fail("Test failed.");
+ }
+ catch (Exception e) {
+ //expected
+ assertTrue(e instanceof RuntimeException);
+ assertEquals(errorMsg, e.getMessage());
+ } finally {
+ coord.shutdown(JobStatus.FINISHED);
+ }
+ }
+
/**
* This test triggers a checkpoint and then sends a decline checkpoint message from
* one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
@@ -2299,6 +2315,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
.build();
}
+ private CheckpointFailureManager getCheckpointFailureManager(String errorMsg) {
+ return new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {
+ throw new RuntimeException(errorMsg);
+ }
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
+ throw new RuntimeException(errorMsg);
+ }
+ });
+ }
+
private PendingCheckpoint declineSynchronousSavepoint(
final JobID jobId,
final CheckpointCoordinator coordinator,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 6b050e5..8d43ed7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -76,7 +76,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1);
}
- assertEquals(1, callback.getInvokeCounter());
+ assertEquals(2, callback.getInvokeCounter());
}
@Test