You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/27 15:12:13 UTC

[flink] branch release-1.11 updated: [FLINK-17351] Increase `continuousFailureCounter` in `CheckpointFailureManager` for CHECKPOINT_EXPIRED

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f964141  [FLINK-17351] Increase `continuousFailureCounter` in `CheckpointFailureManager` for CHECKPOINT_EXPIRED
f964141 is described below

commit f9641411ca72ca46d74d769dc6d203fa8bb37f60
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Wed May 20 14:41:43 2020 +0800

    [FLINK-17351] Increase `continuousFailureCounter` in `CheckpointFailureManager` for CHECKPOINT_EXPIRED
    
    Before this PR, `CHECKPOINT_EXPIRED` is not counted in `continuousFailureCounter`. Hence,
    if the failure of checkpointing is detected after checkpoint times out, the failure gets ignored since
    the `PendingCheckpoint` has already been discarded, leading the job unable to restart automatically in theory
    unless something else fails.
    
    This PR counts `CHECKPOINT_EXPIRED` in `continuousFailureCounter`.
---
 .../checkpoint/CheckpointFailureManager.java       |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 60 +++++++++++++++++-----
 .../checkpoint/CheckpointFailureManagerTest.java   |  2 +-
 3 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 0dc655b..fbf5d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -117,7 +117,6 @@ public class CheckpointFailureManager {
 			case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
 
 			case EXCEPTION:
-			case CHECKPOINT_EXPIRED:
 			case TASK_FAILURE:
 			case TASK_CHECKPOINT_FAILURE:
 			case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
@@ -127,6 +126,7 @@ public class CheckpointFailureManager {
 				break;
 
 			case CHECKPOINT_DECLINED:
+			case CHECKPOINT_EXPIRED:
 				//we should make sure one checkpoint only be counted once
 				if (countedCheckpointIds.add(checkpointId)) {
 					continuousFailureCounter.incrementAndGet();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index e799836..82f5889 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionJobVertex;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -201,7 +202,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
+	public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
 		final JobID jid = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
@@ -212,19 +213,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 
-		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(
-			0,
-			new CheckpointFailureManager.FailJobCallback() {
-				@Override
-				public void failJob(Throwable cause) {
-					throw new RuntimeException(errorMsg);
-				}
-
-				@Override
-				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
-					throw new RuntimeException(errorMsg);
-				}
-			});
+		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
 
 		// set up the coordinator
 		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
@@ -262,6 +251,33 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+		ExecutionVertex vertex2 = mockExecutionVertex(new ExecutionAttemptID());
+
+		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
+		CheckpointFailureManager checkpointFailureManager = getCheckpointFailureManager(errorMsg);
+		CheckpointCoordinator coord = getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
+
+		try {
+			coord.triggerCheckpoint(false);
+			manuallyTriggeredScheduledExecutor.triggerAll();
+
+			coord.abortPendingCheckpoints(new CheckpointException(CHECKPOINT_EXPIRED));
+
+			fail("Test failed.");
+		}
+		catch (Exception e) {
+			//expected
+			assertTrue(e instanceof RuntimeException);
+			assertEquals(errorMsg, e.getMessage());
+		} finally {
+			coord.shutdown(JobStatus.FINISHED);
+		}
+	}
+
 	/**
 	 * This test triggers a checkpoint and then sends a decline checkpoint message from
 	 * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
@@ -2299,6 +2315,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			.build();
 	}
 
+	private CheckpointFailureManager getCheckpointFailureManager(String errorMsg) {
+		return new CheckpointFailureManager(
+			0,
+			new CheckpointFailureManager.FailJobCallback() {
+				@Override
+				public void failJob(Throwable cause) {
+					throw new RuntimeException(errorMsg);
+				}
+
+				@Override
+				public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
+					throw new RuntimeException(errorMsg);
+				}
+			});
+	}
+
 	private PendingCheckpoint declineSynchronousSavepoint(
 			final JobID jobId,
 			final CheckpointCoordinator coordinator,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 6b050e5..8d43ed7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -76,7 +76,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
 			failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1);
 		}
 
-		assertEquals(1, callback.getInvokeCounter());
+		assertEquals(2, callback.getInvokeCounter());
 	}
 
 	@Test