You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/11/07 22:38:11 UTC
[flink] branch release-1.9 updated: [FLINK-13969][Checkpointing] Do
not allow trigger new checkpoint after stopping the coordinator
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 830f2d7 [FLINK-13969][Checkpointing] Do not allow trigger new checkpoint after stopping the coordinator
830f2d7 is described below
commit 830f2d7a378e440fbaae2d95ac793ec5c87ed088
Author: klion26 <qc...@gmail.com>
AuthorDate: Thu Nov 7 15:52:18 2019 +0100
[FLINK-13969][Checkpointing] Do not allow trigger new checkpoint after stopping the coordinator
Currently, we just check whether coordinator has been stopped in eager pre-check, if the coordinator stopped after eager pre-check
has been done, we'll trigger a new checkpoint even if the coordinator has been stopped.
In this commit we'll prevent triggering new checkpoint in such case.
[hotfix] extract common logic for checkpoint trigger check
[hotfix] Get rid of Mockito in test
---
.../runtime/checkpoint/CheckpointCoordinator.java | 74 ++++++++++------------
.../checkpoint/CheckpointCoordinatorTest.java | 62 ++++++++++++++++++
2 files changed, 94 insertions(+), 42 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9f4e703..5f7ec4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -467,30 +467,7 @@ public class CheckpointCoordinator {
// make some eager pre-checks
synchronized (lock) {
- // abort if the coordinator has been shutdown in the meantime
- if (shutdown) {
- throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
- }
-
- // Don't allow periodic checkpoint if scheduling has been disabled
- if (isPeriodic && !periodicScheduling) {
- throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
- }
-
- // validate whether the checkpoint can be triggered, with respect to the limit of
- // concurrent checkpoints, and the minimum time between checkpoints.
- // these checks are not relevant for savepoints
- if (!props.forceCheckpoint()) {
- // sanity check: there should never be more than one trigger request queued
- if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
- throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
- }
-
- checkConcurrentCheckpoints();
-
- checkMinPauseBetweenCheckpoints();
- }
+ preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
// check if all tasks that we need to trigger are running.
@@ -600,19 +577,7 @@ public class CheckpointCoordinator {
synchronized (lock) {
// since we released the lock in the meantime, we need to re-check
// that the conditions still hold.
- if (shutdown) {
- throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
- }
- else if (!props.forceCheckpoint()) {
- if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
- throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
- }
-
- checkConcurrentCheckpoints();
-
- checkMinPauseBetweenCheckpoints();
- }
+ preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
@@ -660,7 +625,7 @@ public class CheckpointCoordinator {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
- checkpointID, job, numUnsuccessful, t);
+ checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
@@ -668,15 +633,17 @@ public class CheckpointCoordinator {
try {
checkpointStorageLocation.disposeOnFailure();
- }
- catch (Throwable t2) {
+ } catch (Throwable t2) {
LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
}
+ // rethrow the CheckpointException directly.
+ if (t instanceof CheckpointException) {
+ throw (CheckpointException) t;
+ }
throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
-
- } // end trigger lock
+ }
}
// --------------------------------------------------------------------------------------------
@@ -1459,4 +1426,27 @@ public class CheckpointCoordinator {
private boolean allPendingCheckpointsDiscarded() {
return pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
}
+
+ private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
+ // abort if the coordinator has been shutdown in the meantime
+ if (shutdown) {
+ throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
+ }
+
+ // Don't allow periodic checkpoint if scheduling has been disabled
+ if (isPeriodic && !periodicScheduling) {
+ throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
+ }
+
+ if (!forceCheckpoint) {
+ if (triggerRequestQueued) {
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
+ throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
+ }
+
+ checkConcurrentCheckpoints();
+
+ checkMinPauseBetweenCheckpoints();
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 98d80fd..90d3ee9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -3961,6 +3962,52 @@ public class CheckpointCoordinatorTest extends TestLogger {
coordinator.shutdown(JobStatus.FAILING);
}
+ /**
+ * Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check.
+ */
+ @Test
+ public void testTriggerCheckpointAfterCancel() throws Exception {
+ ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+ // set up the coordinator
+ CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ idCounter,
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+ idCounter.setOwner(coord);
+
+ try {
+ // start the coordinator
+ coord.startCheckpointScheduler();
+ try {
+ coord.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true, false);
+ fail("should not trigger periodic checkpoint after stop the coordinator.");
+ } catch (CheckpointException e) {
+ assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason());
+ }
+ } finally {
+ coord.shutdown(JobStatus.FINISHED);
+ }
+ }
+
private CheckpointCoordinator getCheckpointCoordinator(
final JobID jobId,
final ExecutionVertex vertex1,
@@ -4103,4 +4150,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
return vertex;
}
+
+ private static class TestingCheckpointIDCounter extends StandaloneCheckpointIDCounter {
+ private CheckpointCoordinator owner;
+
+ @Override
+ public long getAndIncrement() throws Exception {
+ checkNotNull(owner);
+ owner.stopCheckpointScheduler();
+ return super.getAndIncrement();
+ }
+
+ void setOwner(CheckpointCoordinator coordinator) {
+ this.owner = checkNotNull(coordinator);
+ }
+ }
}