You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:42:59 UTC

[flink] 05/07: [FLINK-13440] Report reason when failing job due to checkpoint failure.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81de0ee63d4b882cc8253f19826ef5605a5f7d27
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Jul 25 15:05:46 2019 +0200

    [FLINK-13440] Report reason when failing job due to checkpoint failure.
---
 .../flink/runtime/checkpoint/CheckpointFailureManager.java       | 4 ++--
 .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java  | 9 ++++-----
 .../runtime/checkpoint/CheckpointCoordinatorFailureTest.java     | 2 +-
 .../runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java | 2 +-
 .../flink/runtime/checkpoint/CheckpointCoordinatorTest.java      | 4 ++--
 .../flink/runtime/checkpoint/CheckpointFailureManagerTest.java   | 2 +-
 .../flink/runtime/checkpoint/CheckpointStateRestoreTest.java     | 2 +-
 7 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 568e836..07f37fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -104,7 +104,7 @@ public class CheckpointFailureManager {
 
 		if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
 			clearCount();
-			failureCallback.failJob();
+			failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
 		}
 	}
 
@@ -128,7 +128,7 @@ public class CheckpointFailureManager {
 	 */
 	public interface FailJobCallback {
 
-		void failJob();
+		void failJob(final Throwable cause);
 
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c85b52..cc51042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -80,7 +80,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
@@ -581,10 +580,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
-		CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () ->
-			getJobMasterMainThreadExecutor().execute(() ->
-				failGlobal(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."))
-			));
+		CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				chkConfig.getTolerableCheckpointFailureNumber(),
+				cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause))
+		);
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 2edbb1e..beda456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -66,7 +66,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
 		final long triggerTimestamp = 1L;
 
-		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, () -> {});
+		CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable -> {});
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 9990772..7bd28e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -442,7 +442,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY,
-				new CheckpointFailureManager(0, () -> {}));
+				new CheckpointFailureManager(0, throwable -> {}));
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fcd7150..a98102e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -126,7 +126,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, () -> {});
+		failureManager = new CheckpointFailureManager(0, throwable -> {});
 	}
 
 	@Test
@@ -325,7 +325,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final String errorMsg = "Exceeded checkpoint failure tolerance number!";
 
-		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, () -> {
+		CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable -> {
 			throw new RuntimeException(errorMsg);
 		});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 2f9c151..193cb2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -105,7 +105,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
 		private int invokeCounter = 0;
 
 		@Override
-		public void failJob() {
+		public void failJob(final Throwable cause) {
 			invokeCounter++;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 08a7a8c..1fa2a83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -67,7 +67,7 @@ public class CheckpointStateRestoreTest {
 
 	@Before
 	public void setUp() throws Exception {
-		failureManager = new CheckpointFailureManager(0, () -> {});
+		failureManager = new CheckpointFailureManager(0, throwable -> {});
 	}
 
 	/**