You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/11/07 22:38:11 UTC

[flink] branch release-1.9 updated: [FLINK-13969][Checkpointing] Do not allow trigger new checkpoint after stopping the coordinator

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 830f2d7  [FLINK-13969][Checkpointing] Do not allow trigger new checkpoint after stopping the coordinator
830f2d7 is described below

commit 830f2d7a378e440fbaae2d95ac793ec5c87ed088
Author: klion26 <qc...@gmail.com>
AuthorDate: Thu Nov 7 15:52:18 2019 +0100

    [FLINK-13969][Checkpointing] Do not allow trigger new checkpoint after stopping the coordinator
    
    Currently, we just check whether coordinator has been stopped in eager pre-check, if the coordinator stopped after eager pre-check
    has been done, we'll trigger a new checkpoint even if the coordinator has been stopped.
    
    In this commit we'll prevent triggering new checkpoint in such case.
    
    [hotfix] extract common logic for checkpoint trigger check
    
    [hotfix] Get rid of Mockito in test
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 74 ++++++++++------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 62 ++++++++++++++++++
 2 files changed, 94 insertions(+), 42 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9f4e703..5f7ec4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -467,30 +467,7 @@ public class CheckpointCoordinator {
 
 		// make some eager pre-checks
 		synchronized (lock) {
-			// abort if the coordinator has been shutdown in the meantime
-			if (shutdown) {
-				throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
-			}
-
-			// Don't allow periodic checkpoint if scheduling has been disabled
-			if (isPeriodic && !periodicScheduling) {
-				throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
-			}
-
-			// validate whether the checkpoint can be triggered, with respect to the limit of
-			// concurrent checkpoints, and the minimum time between checkpoints.
-			// these checks are not relevant for savepoints
-			if (!props.forceCheckpoint()) {
-				// sanity check: there should never be more than one trigger request queued
-				if (triggerRequestQueued) {
-					LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
-					throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
-				}
-
-				checkConcurrentCheckpoints();
-
-				checkMinPauseBetweenCheckpoints();
-			}
+			preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
 		}
 
 		// check if all tasks that we need to trigger are running.
@@ -600,19 +577,7 @@ public class CheckpointCoordinator {
 				synchronized (lock) {
 					// since we released the lock in the meantime, we need to re-check
 					// that the conditions still hold.
-					if (shutdown) {
-						throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
-					}
-					else if (!props.forceCheckpoint()) {
-						if (triggerRequestQueued) {
-							LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
-							throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
-						}
-
-						checkConcurrentCheckpoints();
-
-						checkMinPauseBetweenCheckpoints();
-					}
+					preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
 
 					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
 
@@ -660,7 +625,7 @@ public class CheckpointCoordinator {
 
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
-						checkpointID, job, numUnsuccessful, t);
+					checkpointID, job, numUnsuccessful, t);
 
 				if (!checkpoint.isDiscarded()) {
 					failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
@@ -668,15 +633,17 @@ public class CheckpointCoordinator {
 
 				try {
 					checkpointStorageLocation.disposeOnFailure();
-				}
-				catch (Throwable t2) {
+				} catch (Throwable t2) {
 					LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
 				}
 
+				// rethrow the CheckpointException directly.
+				if (t instanceof CheckpointException) {
+					throw (CheckpointException) t;
+				}
 				throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
 			}
-
-		} // end trigger lock
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1459,4 +1426,27 @@ public class CheckpointCoordinator {
 	private boolean allPendingCheckpointsDiscarded() {
 		return pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
 	}
+
+	private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
+		// abort if the coordinator has been shutdown in the meantime
+		if (shutdown) {
+			throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
+		}
+
+		// Don't allow periodic checkpoint if scheduling has been disabled
+		if (isPeriodic && !periodicScheduling) {
+			throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
+		}
+
+		if (!forceCheckpoint) {
+			if (triggerRequestQueued) {
+				LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
+				throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
+			}
+
+			checkConcurrentCheckpoints();
+
+			checkMinPauseBetweenCheckpoints();
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 98d80fd..90d3ee9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -3961,6 +3962,52 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		coordinator.shutdown(JobStatus.FAILING);
 	}
 
+	/**
+	 * Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check.
+	 */
+	@Test
+	public void testTriggerCheckpointAfterCancel() throws Exception {
+		ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+		// set up the coordinator
+		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true,
+			false,
+			0);
+		TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			new JobID(),
+			chkConfig,
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			idCounter,
+			new StandaloneCompletedCheckpointStore(1),
+			new MemoryStateBackend(),
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY,
+			failureManager);
+		idCounter.setOwner(coord);
+
+		try {
+			// start the coordinator
+			coord.startCheckpointScheduler();
+			try {
+				coord.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true, false);
+				fail("should not trigger periodic checkpoint after stop the coordinator.");
+			} catch (CheckpointException e) {
+				assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason());
+			}
+		} finally {
+			coord.shutdown(JobStatus.FINISHED);
+		}
+	}
+
 	private CheckpointCoordinator getCheckpointCoordinator(
 			final JobID jobId,
 			final ExecutionVertex vertex1,
@@ -4103,4 +4150,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 		return vertex;
 	}
+
+	private static class TestingCheckpointIDCounter extends StandaloneCheckpointIDCounter {
+		private CheckpointCoordinator owner;
+
+		@Override
+		public long getAndIncrement() throws Exception {
+			checkNotNull(owner);
+			owner.stopCheckpointScheduler();
+			return super.getAndIncrement();
+		}
+
+		void setOwner(CheckpointCoordinator coordinator) {
+			this.owner = checkNotNull(coordinator);
+		}
+	}
 }