You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:46:15 UTC

[flink] branch release-1.9 updated (2a9e185 -> c4872af)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2a9e185  [FLINK-13499][maprfs] Handle MapR dependency purely through reflection
     new ab5c78d  [hotfix][tests] TimestampITCase: use explicit CheckpointFailureReason to ignore exception
     new 9407e1b  [FLINK-12858][checkpointing] Fail whole job when in-flight sync savepoint is discarded by a task
     new cc66a07  [FLINK-12858] Change the CHECKPOINT_COORDINATOR_SHUTDOWN to be not pre-flight.
     new 0c55710  [hotfix] Rename flag in CheckpointFailureReason to preFlight
     new ce166cc  [FLINK-13440] Report reason when failing job due to checkpoint failure.
     new 4f542cb  [FLINK-13440] Move checkpoint failure logic from scheduler to failure manager
     new c4872af  [FLINK-13440] Add test for FLINK-12858

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  |  10 +-
 .../checkpoint/CheckpointFailureManager.java       |  27 ++-
 .../checkpoint/CheckpointFailureReason.java        |  55 +++---
 .../runtime/executiongraph/ExecutionGraph.java     |   9 +-
 .../flink/runtime/scheduler/LegacyScheduler.java   |  13 +-
 .../CheckpointCoordinatorFailureTest.java          |   2 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 201 ++++++++++-----------
 .../checkpoint/CheckpointFailureManagerTest.java   |   2 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |   2 +-
 .../test/streaming/runtime/TimestampITCase.java    |  11 +-
 11 files changed, 178 insertions(+), 156 deletions(-)


[flink] 07/07: [FLINK-13440] Add test for FLINK-12858

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c4872af6d336481df14fca89c5a103d1c7f62298
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:53:31 2019 +0200

    [FLINK-13440] Add test for FLINK-12858
---
 .../checkpoint/CheckpointFailureManager.java       |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 197 ++++++++++-----------
 2 files changed, 93 insertions(+), 106 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index f65a30a..8d12bef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -127,7 +127,7 @@ public class CheckpointFailureManager {
 	/**
 	 * Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
 	 *
-	 * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+	 * <p>If the checkpoint was cancelled at the checkpoint coordinator, i.e. before
 	 * the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
 	 * as we do not risk having a deadlock.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a98102e..b11bf62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializableObject;
@@ -89,6 +90,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -330,27 +332,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		});
 
 		// set up the coordinator
-		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true,
-			false,
-			0);
-		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			chkConfig,
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1),
-			new MemoryStateBackend(),
-			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY,
-			checkpointFailureManager);
+		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
 
 		try {
 			// trigger the checkpoint. this should succeed
@@ -401,27 +383,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -510,27 +472,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -636,27 +578,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1585,27 +1507,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true,
-			false,
-			0);
-		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			chkConfig,
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1),
-			new MemoryStateBackend(),
-			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY,
-			failureManager);
+		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -3994,6 +3896,91 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+		final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
+		final Throwable expectedRootCause = new IOException("Custom-Exception");
+
+		final JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		final ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		final ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+		// set up the coordinator and validate the initial state
+		final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2,
+				new CheckpointFailureManager(0, throwable -> {
+					invocationCounterAndException.f0 += 1;
+					invocationCounterAndException.f1 = throwable;
+				}));
+
+		final CompletableFuture<CompletedCheckpoint> savepointFuture = coordinator
+				.triggerSynchronousSavepoint(10L, false, "test-dir");
+
+		final PendingCheckpoint syncSavepoint = declineSynchronousSavepoint(jobId, coordinator, attemptID1, expectedRootCause);
+
+		assertTrue(syncSavepoint.isDiscarded());
+
+		try {
+			savepointFuture.get();
+			fail("Expected Exception not found.");
+		} catch (ExecutionException e) {
+			final Throwable cause = ExceptionUtils.stripExecutionException(e);
+			assertTrue(cause instanceof CheckpointException);
+			assertEquals(expectedRootCause.getMessage(), cause.getCause().getMessage());
+		}
+
+		assertEquals(1L, invocationCounterAndException.f0.intValue());
+		assertTrue(
+				invocationCounterAndException.f1 instanceof CheckpointException &&
+				invocationCounterAndException.f1.getCause().getMessage().equals(expectedRootCause.getMessage()));
+
+		coordinator.shutdown(JobStatus.FAILING);
+	}
+
+	private CheckpointCoordinator getCheckpointCoordinator(
+			final JobID jobId,
+			final ExecutionVertex vertex1,
+			final ExecutionVertex vertex2,
+			final CheckpointFailureManager failureManager) {
+
+		final CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				false,
+				0);
+
+		return new CheckpointCoordinator(
+				jobId,
+				chkConfig,
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				failureManager);
+	}
+
+	private PendingCheckpoint declineSynchronousSavepoint(
+			final JobID jobId,
+			final CheckpointCoordinator coordinator,
+			final ExecutionAttemptID attemptID,
+			final Throwable reason) {
+
+		final long checkpointId = coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		final PendingCheckpoint checkpoint = coordinator.getPendingCheckpoints().get(checkpointId);
+		coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, reason), TASK_MANAGER_LOCATION_INFO);
+		return checkpoint;
+	}
+
 	private void performIncrementalCheckpoint(
 		JobID jid,
 		CheckpointCoordinator coord,


[flink] 04/07: [hotfix] Rename flag in CheckpointFailureReason to preFlight

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c55710f5602f52085b441b38125a9cf24212ef6
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jul 29 16:04:20 2019 +0200

    [hotfix] Rename flag in CheckpointFailureReason to preFlight
---
 .../apache/flink/runtime/checkpoint/CheckpointFailureReason.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 7b62e47..2686c93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -70,11 +70,11 @@ public enum CheckpointFailureReason {
 
 	// ------------------------------------------------------------------------
 
-	private final boolean isPreFlight;
+	private final boolean preFlight;
 	private final String message;
 
 	CheckpointFailureReason(boolean isPreFlight, String message) {
-		this.isPreFlight = isPreFlight;
+		this.preFlight = isPreFlight;
 		this.message = message;
 	}
 
@@ -86,6 +86,6 @@ public enum CheckpointFailureReason {
 	 * @return true if this value indicates a failure reason happening before a checkpoint is passed to a job's tasks.
 	 */
 	public boolean isPreFlight() {
-		return isPreFlight;
+		return preFlight;
 	}
 }


[flink] 03/07: [FLINK-12858] Change the CHECKPOINT_COORDINATOR_SHUTDOWN to be not pre-flight.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc66a074aab00fbaf3deee20955e974815e3ab5e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:05:12 2019 +0200

    [FLINK-12858] Change the CHECKPOINT_COORDINATOR_SHUTDOWN to be not pre-flight.
---
 .../org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 6e00233..7b62e47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -54,7 +54,7 @@ public enum CheckpointFailureReason {
 
 	CHECKPOINT_DECLINED_INPUT_END_OF_STREAM(false, "Checkpoint was declined because one input stream is finished"),
 
-	CHECKPOINT_COORDINATOR_SHUTDOWN(true, "CheckpointCoordinator shutdown."),
+	CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."),
 
 	CHECKPOINT_COORDINATOR_SUSPEND(false, "Checkpoint Coordinator is suspending."),
 


[flink] 02/07: [FLINK-12858][checkpointing] Fail whole job when in-flight sync savepoint is discarded by a task

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9407e1b4a56c853688d7395da448d7d107c6fb76
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Mon Jul 15 18:04:42 2019 +0200

    [FLINK-12858][checkpointing] Fail whole job when in-flight sync savepoint is discarded by a task
---
 .../checkpoint/CheckpointFailureReason.java        | 55 +++++++++++++---------
 .../flink/runtime/scheduler/LegacyScheduler.java   | 17 +++++++
 2 files changed, 49 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index e00cce7..6e00233 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -23,60 +23,69 @@ package org.apache.flink.runtime.checkpoint;
  */
 public enum CheckpointFailureReason {
 
-	PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut down."),
+	PERIODIC_SCHEDULER_SHUTDOWN(true, "Periodic checkpoint scheduler is shut down."),
 
-	ALREADY_QUEUED("Another checkpoint request has already been queued."),
+	ALREADY_QUEUED(true, "Another checkpoint request has already been queued."),
 
-	TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent checkpoints is exceeded"),
+	TOO_MANY_CONCURRENT_CHECKPOINTS(true, "The maximum number of concurrent checkpoints is exceeded"),
 
-	MINIMUM_TIME_BETWEEN_CHECKPOINTS("The minimum time between checkpoints is still pending. " +
+	MINIMUM_TIME_BETWEEN_CHECKPOINTS(true, "The minimum time between checkpoints is still pending. " +
 			"Checkpoint will be triggered after the minimum time."),
 
-	NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."),
+	NOT_ALL_REQUIRED_TASKS_RUNNING(true, "Not all required tasks are currently running."),
 
-	EXCEPTION("An Exception occurred while triggering the checkpoint."),
+	EXCEPTION(true, "An Exception occurred while triggering the checkpoint."),
 
-	CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+	CHECKPOINT_EXPIRED(false, "Checkpoint expired before completing."),
 
-	CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."),
+	CHECKPOINT_SUBSUMED(false, "Checkpoint has been subsumed."),
 
-	CHECKPOINT_DECLINED("Checkpoint was declined."),
+	CHECKPOINT_DECLINED(false, "Checkpoint was declined."),
 
-	CHECKPOINT_DECLINED_TASK_NOT_READY("Checkpoint was declined (tasks not ready)"),
+	CHECKPOINT_DECLINED_TASK_NOT_READY(false, "Checkpoint was declined (tasks not ready)"),
 
-	CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING("Task does not support checkpointing"),
+	CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING(false, "Task does not support checkpointing"),
 
-	CHECKPOINT_DECLINED_SUBSUMED("Checkpoint was canceled because a barrier from newer checkpoint was received."),
+	CHECKPOINT_DECLINED_SUBSUMED(false, "Checkpoint was canceled because a barrier from newer checkpoint was received."),
 
-	CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER("Task received cancellation from one of its inputs"),
+	CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER(false, "Task received cancellation from one of its inputs"),
 
-	CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED("The checkpoint alignment phase needed to buffer more than the configured maximum bytes"),
+	CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED(false, "The checkpoint alignment phase needed to buffer more than the configured maximum bytes"),
 
-	CHECKPOINT_DECLINED_INPUT_END_OF_STREAM("Checkpoint was declined because one input stream is finished"),
+	CHECKPOINT_DECLINED_INPUT_END_OF_STREAM(false, "Checkpoint was declined because one input stream is finished"),
 
-	CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+	CHECKPOINT_COORDINATOR_SHUTDOWN(true, "CheckpointCoordinator shutdown."),
 
-	CHECKPOINT_COORDINATOR_SUSPEND("Checkpoint Coordinator is suspending."),
+	CHECKPOINT_COORDINATOR_SUSPEND(false, "Checkpoint Coordinator is suspending."),
 
-	JOB_FAILURE("The job has failed."),
+	JOB_FAILURE(false, "The job has failed."),
 
-	JOB_FAILOVER_REGION("FailoverRegion is restarting."),
+	JOB_FAILOVER_REGION(false, "FailoverRegion is restarting."),
 
-	TASK_CHECKPOINT_FAILURE("Task local checkpoint failure."),
+	TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."),
 
-	FINALIZE_CHECKPOINT_FAILURE("Failure to finalize checkpoint."),
+	FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
 
-	TRIGGER_CHECKPOINT_FAILURE("Trigger checkpoint failure.");
+	TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure.");
 
 	// ------------------------------------------------------------------------
 
+	private final boolean isPreFlight;
 	private final String message;
 
-	CheckpointFailureReason(String message) {
+	CheckpointFailureReason(boolean isPreFlight, String message) {
+		this.isPreFlight = isPreFlight;
 		this.message = message;
 	}
 
 	public String message() {
 		return message;
 	}
+
+	/**
+	 * @return true if this value indicates a failure reason happening before a checkpoint is passed to a job's tasks.
+	 */
+	public boolean isPreFlight() {
+		return isPreFlight;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index 7dec7fc..fad7c9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -75,6 +77,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.function.FunctionUtils;
@@ -618,6 +621,13 @@ public class LegacyScheduler implements SchedulerNG {
 			.handle((completedCheckpoint, throwable) -> {
 				if (throwable != null) {
 					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
+					// it's possible this failure hasn't triggered a task failure, but rather the savepoint was aborted
+					// "softly" (for example on checkpoints barriers alignment, due to buffer limits).
+					// such situation may leave other tasks of the job in a blocking state.
+					// to workaround this, we fail the whole job.
+					if (!isCheckpointPreFlightFailure(throwable)) {
+						executionGraph.failGlobal(throwable);
+					}
 					throw new CompletionException(throwable);
 				}
 				return completedCheckpoint.getExternalPointer();
@@ -649,4 +659,11 @@ public class LegacyScheduler implements SchedulerNG {
 			.map(TaskManagerLocation::toString)
 			.orElse("Unknown location");
 	}
+
+	private static boolean isCheckpointPreFlightFailure(Throwable throwable) {
+		return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
+			.map(CheckpointException::getCheckpointFailureReason)
+			.map(CheckpointFailureReason::isPreFlight)
+			.orElse(false);
+	}
 }


[flink] 06/07: [FLINK-13440] Move checkpoint failure logic from scheduler to failure manager

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f542cbe83ff225d059363da6a6f837405be7bfa
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:52:22 2019 +0200

    [FLINK-13440] Move checkpoint failure logic from scheduler to failure manager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 10 +++++++-
 .../checkpoint/CheckpointFailureManager.java       | 23 +++++++++++++++++
 .../flink/runtime/scheduler/LegacyScheduler.java   | 30 +++-------------------
 3 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 682685c..7f258c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -395,7 +396,14 @@ public class CheckpointCoordinator {
 			@Nullable final String targetLocation) {
 
 		final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
-		return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation);
+		return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle(
+				(completedCheckpoint, throwable) -> {
+					if (throwable != null) {
+						failureManager.handleSynchronousSavepointFailure(throwable);
+						throw new CompletionException(throwable);
+					}
+					return completedCheckpoint;
+				});
 	}
 
 	private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 07f37fd..f65a30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Set;
@@ -124,6 +125,28 @@ public class CheckpointFailureManager {
 	}
 
 	/**
+	 * Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
+	 *
+	 * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+	 * the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
+	 * as we do not risk having a deadlock.
+	 *
+	 * @param cause The reason why the job is cancelled.
+	 * */
+	void handleSynchronousSavepointFailure(final Throwable cause) {
+		if (!isPreFlightFailure(cause)) {
+			failureCallback.failJob(cause);
+		}
+	}
+
+	private static boolean isPreFlightFailure(final Throwable cause) {
+		return ExceptionUtils.findThrowable(cause, CheckpointException.class)
+				.map(CheckpointException::getCheckpointFailureReason)
+				.map(CheckpointFailureReason::isPreFlight)
+				.orElse(false);
+	}
+
+	/**
 	 * A callback interface about how to fail a job.
 	 */
 	public interface FailJobCallback {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index fad7c9f..e33b6b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -77,7 +75,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.function.FunctionUtils;
@@ -608,30 +605,16 @@ public class LegacyScheduler implements SchedulerNG {
 					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
 		}
 
-		final long now = System.currentTimeMillis();
-
 		// we stop the checkpoint coordinator so that we are guaranteed
 		// to have only the data of the synchronous savepoint committed.
 		// in case of failure, and if the job restarts, the coordinator
 		// will be restarted by the CheckpointCoordinatorDeActivator.
 		checkpointCoordinator.stopCheckpointScheduler();
 
+		final long now = System.currentTimeMillis();
 		final CompletableFuture<String> savepointFuture = checkpointCoordinator
-			.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
-			.handle((completedCheckpoint, throwable) -> {
-				if (throwable != null) {
-					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
-					// it's possible this failure hasn't triggered a task failure, but rather the savepoint was aborted
-					// "softly" (for example on checkpoints barriers alignment, due to buffer limits).
-					// such situation may leave other tasks of the job in a blocking state.
-					// to workaround this, we fail the whole job.
-					if (!isCheckpointPreFlightFailure(throwable)) {
-						executionGraph.failGlobal(throwable);
-					}
-					throw new CompletionException(throwable);
-				}
-				return completedCheckpoint.getExternalPointer();
-			});
+				.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+				.thenApply(CompletedCheckpoint::getExternalPointer);
 
 		final CompletableFuture<JobStatus> terminationFuture = executionGraph
 			.getTerminationFuture()
@@ -659,11 +642,4 @@ public class LegacyScheduler implements SchedulerNG {
 			.map(TaskManagerLocation::toString)
 			.orElse("Unknown location");
 	}
-
-	private static boolean isCheckpointPreFlightFailure(Throwable throwable) {
-		return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
-			.map(CheckpointException::getCheckpointFailureReason)
-			.map(CheckpointFailureReason::isPreFlight)
-			.orElse(false);
-	}
 }


[flink] 01/07: [hotfix][tests] TimestampITCase: use explicit CheckpointFailureReason to ignore exception

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab5c78d238948e579a06e9e02d31c0c88f71f9fd
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Wed Jul 17 11:13:16 2019 +0200

    [hotfix][tests] TimestampITCase: use explicit CheckpointFailureReason to ignore exception
---
 .../apache/flink/test/streaming/runtime/TimestampITCase.java  | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2a1b824..f8d8221 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -47,6 +48,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -197,10 +199,11 @@ public class TimestampITCase extends TestLogger {
 							clusterClient.stopWithSavepoint(id, false, "test");
 						}
 						catch (Exception e) {
-							if (
-									!(e.getCause() instanceof CheckpointException) ||
-									!e.getCause().getMessage().contains("Not all required tasks are currently running.")
-							) {
+							boolean ignoreException = ExceptionUtils.findThrowable(e, CheckpointException.class)
+								.map(CheckpointException::getCheckpointFailureReason)
+								.map(reason -> reason == CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
+								.orElse(false);
+							if (!ignoreException) {
 								throw e;
 							}
 						}


[flink] 05/07: [FLINK-13440] Report reason when failing job due to checkpoint failure.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce166cc90c951b73537fe581f2ba02125f1c4ced
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Jul 25 15:05:46 2019 +0200

    [FLINK-13440] Report reason when failing job due to checkpoint failure.
---
 .../flink/runtime/checkpoint/CheckpointFailureManager.java       | 4 ++--
 .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java  | 9 ++++-----
 .../runtime/checkpoint/CheckpointCoordinatorFailureTest.java     | 2 +-
 .../runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java | 2 +-
 .../flink/runtime/checkpoint/CheckpointCoordinatorTest.java      | 4 ++--
 .../flink/runtime/checkpoint/CheckpointFailureManagerTest.java   | 2 +-
 .../flink/runtime/checkpoint/CheckpointStateRestoreTest.java     | 2 +-
 7 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 568e836..07f37fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -104,7 +104,7 @@ public class CheckpointFailureManager {
 
 		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
 			clearCount();
-			failureCallback.failJob();
+			failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
 		}
 	}
 
@@ -128,7 +128,7 @@ public class CheckpointFailureManager {
 	 */
 	public interface FailJobCallback {
 
-		void failJob();
+		void failJob(final Throwable cause);
 
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c85b52..cc51042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -80,7 +80,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
@@ -581,10 +580,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
-		CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () ->
-			getJobMasterMainThreadExecutor().execute(() ->
-				failGlobal(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."))
-			));
+		CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				chkConfig.getTolerableCheckpointFailureNumber(),
+				cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause))
+		);
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 2edbb1e..beda456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -66,7 +66,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
 		final long triggerTimestamp = 1L;
 
-		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, () -> {});
+		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable -> {});
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 9990772..7bd28e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -442,7 +442,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY,
-				new CheckpointFailureManager(0, () -> {}));
+				new CheckpointFailureManager(0, throwable -> {}));
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fcd7150..a98102e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -126,7 +126,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, () -> {});
+		failureManager = new CheckpointFailureManager(0, throwable -> {});
 	}
 
 	@Test
@@ -325,7 +325,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 
-		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, () -> {
+		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable -> {
 			throw new RuntimeException(errorMsg);
 		});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 2f9c151..193cb2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -105,7 +105,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		private int invokeCounter = 0;
 
 		@Override
-		public void failJob() {
+		public void failJob(final Throwable cause) {
 			invokeCounter++;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 08a7a8c..1fa2a83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -67,7 +67,7 @@ public class CheckpointStateRestoreTest {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, () -> {});
+		failureManager = new CheckpointFailureManager(0, throwable -> {});
 	}
 
 	/**