You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:42:59 UTC
[flink] 05/07: [FLINK-13440] Report reason when failing job due to
checkpoint failure.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81de0ee63d4b882cc8253f19826ef5605a5f7d27
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Jul 25 15:05:46 2019 +0200
[FLINK-13440] Report reason when failing job due to checkpoint failure.
---
.../flink/runtime/checkpoint/CheckpointFailureManager.java | 4 ++--
.../org/apache/flink/runtime/executiongraph/ExecutionGraph.java | 9 ++++-----
.../runtime/checkpoint/CheckpointCoordinatorFailureTest.java | 2 +-
.../runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java | 2 +-
.../flink/runtime/checkpoint/CheckpointCoordinatorTest.java | 4 ++--
.../flink/runtime/checkpoint/CheckpointFailureManagerTest.java | 2 +-
.../flink/runtime/checkpoint/CheckpointStateRestoreTest.java | 2 +-
7 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 568e836..07f37fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -104,7 +104,7 @@ public class CheckpointFailureManager {
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
- failureCallback.failJob();
+ failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
}
}
@@ -128,7 +128,7 @@ public class CheckpointFailureManager {
*/
public interface FailJobCallback {
- void failJob();
+ void failJob(final Throwable cause);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c85b52..cc51042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -80,7 +80,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
@@ -581,10 +580,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
- CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () ->
- getJobMasterMainThreadExecutor().execute(() ->
- failGlobal(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."))
- ));
+ CheckpointFailureManager failureManager = new CheckpointFailureManager(
+ chkConfig.getTolerableCheckpointFailureNumber(),
+ cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause))
+ );
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 2edbb1e..beda456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -66,7 +66,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
final long triggerTimestamp = 1L;
- CheckpointFailureManager failureManager = new CheckpointFailureManager(0, () -> {});
+ CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable -> {});
// set up the coordinator and validate the initial state
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 9990772..7bd28e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -442,7 +442,7 @@ public class CheckpointCoordinatorMasterHooksTest {
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY,
- new CheckpointFailureManager(0, () -> {}));
+ new CheckpointFailureManager(0, throwable -> {}));
}
private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fcd7150..a98102e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -126,7 +126,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Before
public void setUp() throws Exception {
- failureManager = new CheckpointFailureManager(0, () -> {});
+ failureManager = new CheckpointFailureManager(0, throwable -> {});
}
@Test
@@ -325,7 +325,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
final String errorMsg = "Exceeded checkpoint failure tolerance number!";
- CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, () -> {
+ CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable -> {
throw new RuntimeException(errorMsg);
});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 2f9c151..193cb2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -105,7 +105,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
private int invokeCounter = 0;
@Override
- public void failJob() {
+ public void failJob(final Throwable cause) {
invokeCounter++;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 08a7a8c..1fa2a83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -67,7 +67,7 @@ public class CheckpointStateRestoreTest {
@Before
public void setUp() throws Exception {
- failureManager = new CheckpointFailureManager(0, () -> {});
+ failureManager = new CheckpointFailureManager(0, throwable -> {});
}
/**