You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/09 12:49:48 UTC
[flink] 01/02: [FLINK-13593][checkpointing] Prevent failing the
wrong job in CheckpointFailureManager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e22e641e4185e1964459d2486e676526990d719
Author: Yu Li <li...@apache.org>
AuthorDate: Tue Aug 6 07:57:12 2019 +0200
[FLINK-13593][checkpointing] Prevent failing the wrong job in CheckpointFailureManager
This closes #9364.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 52 ++++++++++++++++-----
.../checkpoint/CheckpointFailureManager.java | 54 +++++++++++++++++++---
.../runtime/executiongraph/ExecutionGraph.java | 24 +++++++++-
.../CheckpointCoordinatorFailureTest.java | 10 +++-
.../CheckpointCoordinatorMasterHooksTest.java | 11 ++++-
.../checkpoint/CheckpointCoordinatorTest.java | 44 ++++++++++++++----
.../checkpoint/CheckpointFailureManagerTest.java | 34 ++++++++------
.../checkpoint/CheckpointStateRestoreTest.java | 10 +++-
.../executiongraph/ExecutionGraphRestartTest.java | 41 ++++++++++++++++
9 files changed, 235 insertions(+), 45 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7f258c9..9f4e703 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -396,6 +396,7 @@ public class CheckpointCoordinator {
@Nullable final String targetLocation) {
final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
+
return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle(
(completedCheckpoint, throwable) -> {
if (throwable != null) {
@@ -447,7 +448,7 @@ public class CheckpointCoordinator {
long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
// here we can not get the failed pending checkpoint's id,
// so we pass the negative latest generated checkpoint id as a special flag
- failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId);
+ failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);
return false;
}
}
@@ -718,7 +719,7 @@ public class CheckpointCoordinator {
message.getTaskExecutionId(),
job,
taskManagerLocationInfo);
- discardCheckpoint(checkpoint, message.getReason());
+ discardCheckpoint(checkpoint, message.getReason(), message.getTaskExecutionId());
}
else if (checkpoint != null) {
// this should not happen
@@ -945,7 +946,7 @@ public class CheckpointCoordinator {
if (!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) {
pendingCheckpointIterator.remove();
- discardCheckpoint(pendingCheckpoint, cause);
+ discardCheckpoint(pendingCheckpoint, cause, executionAttemptId);
}
}
}
@@ -1332,8 +1333,12 @@ public class CheckpointCoordinator {
*
* @param pendingCheckpoint to discard
* @param cause for discarding the checkpoint
+ * @param executionAttemptID the execution attempt id of the failing task.
*/
- private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) {
+ private void discardCheckpoint(
+ PendingCheckpoint pendingCheckpoint,
+ @Nullable Throwable cause,
+ ExecutionAttemptID executionAttemptID) {
assert(Thread.holdsLock(lock));
Preconditions.checkNotNull(pendingCheckpoint);
@@ -1342,12 +1347,12 @@ public class CheckpointCoordinator {
LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause);
if (cause == null) {
- failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED);
+ failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
} else if (cause instanceof CheckpointException) {
CheckpointException exception = (CheckpointException) cause;
- failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause);
+ failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID);
} else {
- failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause);
+ failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
}
rememberRecentCheckpointId(checkpointId);
@@ -1401,21 +1406,46 @@ public class CheckpointCoordinator {
}
private void failPendingCheckpoint(
- final PendingCheckpoint pendingCheckpoint,
- final CheckpointFailureReason reason) {
+ final PendingCheckpoint pendingCheckpoint,
+ final CheckpointFailureReason reason) {
failPendingCheckpoint(pendingCheckpoint, reason, null);
}
private void failPendingCheckpoint(
+ final PendingCheckpoint pendingCheckpoint,
+ final CheckpointFailureReason reason,
+ @Nullable final Throwable cause) {
+
+ CheckpointException exception = new CheckpointException(reason, cause);
+ pendingCheckpoint.abort(reason, cause);
+ failureManager.handleJobLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId());
+
+ checkAndResetCheckpointScheduler();
+ }
+
+ private void failPendingCheckpointDueToTaskFailure(
+ final PendingCheckpoint pendingCheckpoint,
+ final CheckpointFailureReason reason,
+ final ExecutionAttemptID executionAttemptID) {
+
+ failPendingCheckpointDueToTaskFailure(pendingCheckpoint, reason, null, executionAttemptID);
+ }
+
+ private void failPendingCheckpointDueToTaskFailure(
final PendingCheckpoint pendingCheckpoint,
final CheckpointFailureReason reason,
- final Throwable cause) {
+ @Nullable final Throwable cause,
+ final ExecutionAttemptID executionAttemptID) {
CheckpointException exception = new CheckpointException(reason, cause);
pendingCheckpoint.abort(reason, cause);
- failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId());
+ failureManager.handleTaskLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId(), executionAttemptID);
+
+ checkAndResetCheckpointScheduler();
+ }
+ private void checkAndResetCheckpointScheduler() {
if (!shutdown && periodicScheduling && currentPeriodicTrigger == null) {
synchronized (lock) {
if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8d12bef..841ac2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
@@ -50,7 +51,7 @@ public class CheckpointFailureManager {
}
/**
- * Handle checkpoint exception with a handler callback.
+ * Handle job level checkpoint exception with a handler callback.
*
* @param exception the checkpoint exception.
* @param checkpointId the failed checkpoint id used to count the continuous failure number based on
@@ -58,7 +59,38 @@ public class CheckpointFailureManager {
* happens before the checkpoint id generation. In this case, it will be specified a negative
* latest generated checkpoint id as a special flag.
*/
- public void handleCheckpointException(CheckpointException exception, long checkpointId) {
+ public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId) {
+ checkFailureCounter(exception, checkpointId);
+ if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+ clearCount();
+ failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
+ }
+ }
+
+ /**
+ * Handle task level checkpoint exception with a handler callback.
+ *
+ * @param exception the checkpoint exception.
+ * @param checkpointId the failed checkpoint id used to count the continuous failure number based on
+ * checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure
+ * happens before the checkpoint id generation. In this case, it will be specified a negative
+ * latest generated checkpoint id as a special flag.
+ * @param executionAttemptID the execution attempt id, as a safe guard.
+ */
+ public void handleTaskLevelCheckpointException(
+ CheckpointException exception,
+ long checkpointId,
+ ExecutionAttemptID executionAttemptID) {
+ checkFailureCounter(exception, checkpointId);
+ if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+ clearCount();
+ failureCallback.failJobDueToTaskFailure(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."), executionAttemptID);
+ }
+ }
+
+ public void checkFailureCounter(
+ CheckpointException exception,
+ long checkpointId) {
if (tolerableCpFailureNumber == UNLIMITED_TOLERABLE_FAILURE_NUMBER) {
return;
}
@@ -102,11 +134,6 @@ public class CheckpointFailureManager {
default:
throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name());
}
-
- if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
- clearCount();
- failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
- }
}
/**
@@ -151,8 +178,21 @@ public class CheckpointFailureManager {
*/
public interface FailJobCallback {
+ /**
+ * Fails the whole job graph.
+ *
+ * @param cause The reason why the synchronous savepoint fails.
+ */
void failJob(final Throwable cause);
+ /**
+ * Fails the whole job graph due to task failure.
+ *
+ * @param cause The reason why the job is cancelled.
+ * @param failingTask The id of the failing task attempt to prevent failing the job multiple times.
+ */
+ void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID failingTask);
+
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e8134e5..4eb6a73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -575,8 +575,18 @@ public class ExecutionGraph implements AccessExecutionGraph {
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
CheckpointFailureManager failureManager = new CheckpointFailureManager(
- chkConfig.getTolerableCheckpointFailureNumber(),
- cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause))
+ chkConfig.getTolerableCheckpointFailureNumber(),
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {
+ getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
+ }
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
+ getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask));
+ }
+ }
);
// create the coordinator that triggers and commits checkpoints and holds the state
@@ -1097,6 +1107,16 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
+ void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID failingAttempt) {
+ final Execution failedExecution = currentExecutions.get(failingAttempt);
+ if (failedExecution != null && failedExecution.getState() == ExecutionState.RUNNING) {
+ failGlobal(cause);
+ } else {
+ LOG.debug("The failing attempt {} belongs to an already not" +
+ " running task thus won't fail the job", failingAttempt);
+ }
+ }
+
/**
* Fails the execution graph globally. This failure will not be recovered by a specific
* failover strategy, but results in a full restart of all tasks.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index beda456..c3059a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -66,7 +66,15 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
final long triggerTimestamp = 1L;
- CheckpointFailureManager failureManager = new CheckpointFailureManager(0, throwable -> {});
+ CheckpointFailureManager failureManager = new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {}
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {}
+ });
// set up the coordinator and validate the initial state
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 7bd28e2..8067fb4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -442,7 +442,16 @@ public class CheckpointCoordinatorMasterHooksTest {
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY,
- new CheckpointFailureManager(0, throwable -> {}));
+ new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {}
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {}
+ })
+ );
}
private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 18052bb..ffb148b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -128,7 +128,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Before
public void setUp() throws Exception {
- failureManager = new CheckpointFailureManager(0, throwable -> {});
+ failureManager = new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {}
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {}
+ });
}
@Test
@@ -327,9 +335,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
final String errorMsg = "Exceeded checkpoint failure tolerance number!";
- CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, throwable -> {
- throw new RuntimeException(errorMsg);
- });
+ CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {
+ throw new RuntimeException(errorMsg);
+ }
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
+ throw new RuntimeException(errorMsg);
+ }
+ });
// set up the coordinator
CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
@@ -3910,10 +3928,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
// set up the coordinator and validate the initial state
final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2,
- new CheckpointFailureManager(0, throwable -> {
- invocationCounterAndException.f0 += 1;
- invocationCounterAndException.f1 = throwable;
- }));
+ new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {
+ invocationCounterAndException.f0 += 1;
+ invocationCounterAndException.f1 = cause;
+ }
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
+ throw new AssertionError("This method should not be called for the test.");
+ }
+ }));
final CompletableFuture<CompletedCheckpoint> savepointFuture = coordinator
.triggerSynchronousSavepoint(10L, false, "test-dir");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 193cb2d..6b050e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -33,15 +34,15 @@ public class CheckpointFailureManagerTest extends TestLogger {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
- failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
//ignore this
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 4);
assertEquals(1, callback.getInvokeCounter());
}
@@ -51,18 +52,18 @@ public class CheckpointFailureManagerTest extends TestLogger {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
- failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION), 1);
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION), 1);
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
//ignore this
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
//reset
failureManager.handleCheckpointSuccess(4);
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5);
assertEquals(0, callback.getInvokeCounter());
}
@@ -72,7 +73,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback);
for (CheckpointFailureReason reason : CheckpointFailureReason.values()) {
- failureManager.handleCheckpointException(new CheckpointException(reason), -1);
+ failureManager.handleJobLevelCheckpointException(new CheckpointException(reason), -1);
}
assertEquals(1, callback.getInvokeCounter());
@@ -83,16 +84,16 @@ public class CheckpointFailureManagerTest extends TestLogger {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
- failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
//ignore this
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
//ignore repeatedly report from one checkpoint
- failureManager.handleCheckpointException(
+ failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
assertEquals(0, callback.getInvokeCounter());
}
@@ -105,7 +106,12 @@ public class CheckpointFailureManagerTest extends TestLogger {
private int invokeCounter = 0;
@Override
- public void failJob(final Throwable cause) {
+ public void failJob(Throwable cause) {
+ invokeCounter++;
+ }
+
+ @Override
+ public void failJobDueToTaskFailure(final Throwable cause, final ExecutionAttemptID executionAttemptID) {
invokeCounter++;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1fa2a83..a11dfc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -67,7 +67,15 @@ public class CheckpointStateRestoreTest {
@Before
public void setUp() throws Exception {
- failureManager = new CheckpointFailureManager(0, throwable -> {});
+ failureManager = new CheckpointFailureManager(
+ 0,
+ new CheckpointFailureManager.FailJobCallback() {
+ @Override
+ public void failJob(Throwable cause) {}
+
+ @Override
+ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {}
+ });
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index a885398..785ae8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -733,6 +733,47 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.RUNNING, executionGraph.getState());
}
+ @Test
+ public void failGlobalIfExecutionIsStillRunning_failingAnExecutionTwice_ShouldTriggerOnlyOneFailover() throws Exception {
+ JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
+ JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
+ JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
+
+ try (SlotPool slotPool = createSlotPoolImpl()) {
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new TestRestartStrategy(1, false))
+ .setJobGraph(jobGraph)
+ .setNumberOfTasks(2)
+ .buildAndScheduleForExecution(slotPool);
+
+ Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
+
+ Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
+ Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
+
+ finishedExecution.markFinished();
+
+ failedExecution.fail(new Exception("Test Exception"));
+ failedExecution.completeCancelling();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // At this point all resources have been assigned
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
+
+ // fail global with old finished execution, this should not affect the execution
+ eg.failGlobalIfExecutionIsStillRunning(new Exception("This should have no effect"), finishedExecution.getAttemptId());
+
+ assertThat(eg.getState(), is(JobStatus.RUNNING));
+
+ // the state of the finished execution should have not changed since it is terminal
+ assertThat(finishedExecution.getState(), is(ExecutionState.FINISHED));
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------