You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/13 10:31:31 UTC

[flink] 01/03: [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19468a25a906fd1d8bc18113ee6facd2128d5855
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:21:52 2020 +0200

    [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 29 ++++++++----
 .../checkpoint/CheckpointRequestDecider.java       | 52 +++++++---------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 43 ++++++++++++++++++
 .../checkpoint/CheckpointRequestDeciderTest.java   |  4 +-
 4 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 76f9efd..d7b5cee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -317,8 +317,7 @@ public class CheckpointCoordinator {
 			this::rescheduleTrigger,
 			this.clock,
 			this.minPauseBetweenCheckpoints,
-			this.pendingCheckpoints::size,
-			this.lock);
+			this.pendingCheckpoints::size);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -498,9 +497,7 @@ public class CheckpointCoordinator {
 		}
 
 		CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
-		requestDecider
-			.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime)
-			.ifPresent(this::startTriggeringCheckpoint);
+		chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
 		return request.onCompletionPromise;
 	}
 
@@ -833,7 +830,19 @@ public class CheckpointCoordinator {
 	}
 
 	private void executeQueuedRequest() {
-		requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime).ifPresent(this::startTriggeringCheckpoint);
+		chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint);
+	}
+
+	private Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute() {
+		synchronized (lock) {
+			return requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime);
+		}
+	}
+
+	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest request) {
+		synchronized (lock) {
+			return requestDecider.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1412,7 +1421,9 @@ public class CheckpointCoordinator {
 	@Deprecated
 	@VisibleForTesting
 	PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-		return requestDecider.getTriggerRequestQueue();
+		synchronized (lock) {
+			return requestDecider.getTriggerRequestQueue();
+		}
 	}
 
 	public boolean isTriggering() {
@@ -1548,7 +1559,9 @@ public class CheckpointCoordinator {
 	}
 
 	int getNumQueuedRequests() {
-		return requestDecider.getNumQueuedRequests();
+		synchronized (lock) {
+			return requestDecider.getNumQueuedRequests();
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 6a9f779..124dc13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.util.Comparator;
 import java.util.NavigableSet;
 import java.util.Optional;
@@ -51,8 +49,6 @@ class CheckpointRequestDecider {
 	private final Clock clock;
 	private final long minPauseBetweenCheckpoints;
 	private final Supplier<Integer> pendingCheckpointsSizeSupplier;
-	private final Object lock;
-	@GuardedBy("lock")
 	private final NavigableSet<CheckpointTriggerRequest> queuedRequests = new TreeSet<>(checkpointTriggerRequestsComparator());
 	private final int maxQueuedRequests;
 
@@ -61,15 +57,13 @@ class CheckpointRequestDecider {
 			Consumer<Long> rescheduleTrigger,
 			Clock clock,
 			long minPauseBetweenCheckpoints,
-			Supplier<Integer> pendingCheckpointsSizeSupplier,
-			Object lock) {
+			Supplier<Integer> pendingCheckpointsSizeSupplier) {
 		this(
 			maxConcurrentCheckpointAttempts,
 			rescheduleTrigger,
 			clock,
 			minPauseBetweenCheckpoints,
 			pendingCheckpointsSizeSupplier,
-			lock,
 			DEFAULT_MAX_QUEUED_REQUESTS
 		);
 	}
@@ -80,7 +74,6 @@ class CheckpointRequestDecider {
 			Clock clock,
 			long minPauseBetweenCheckpoints,
 			Supplier<Integer> pendingCheckpointsSizeSupplier,
-			Object lock,
 			int maxQueuedRequests) {
 		Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0);
 		Preconditions.checkArgument(maxQueuedRequests > 0);
@@ -89,43 +82,37 @@ class CheckpointRequestDecider {
 		this.clock = clock;
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier;
-		this.lock = lock;
 		this.maxQueuedRequests = maxQueuedRequests;
 	}
 
 	Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
-		synchronized (lock) {
-			if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
-				// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
-				newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-				return Optional.empty();
-			} else {
-				queuedRequests.add(newRequest);
-				if (queuedRequests.size() > maxQueuedRequests) {
-					queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-				}
-				Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
-				request.ifPresent(CheckpointRequestDecider::logInQueueTime);
-				return request;
+		if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
+			// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
+			newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
+			return Optional.empty();
+		} else {
+			queuedRequests.add(newRequest);
+			if (queuedRequests.size() > maxQueuedRequests) {
+				queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
 			}
-		}
-	}
-
-	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
-		synchronized (lock) {
 			Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
 			request.ifPresent(CheckpointRequestDecider::logInQueueTime);
 			return request;
 		}
 	}
 
+	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
+		Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
+		request.ifPresent(CheckpointRequestDecider::logInQueueTime);
+		return request;
+	}
+
 	/**
 	 * Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
 	 * current state. Acquires a lock and may update the state.
 	 * @return request to execute, if any.
 	 */
 	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
-		Preconditions.checkState(Thread.holdsLock(lock));
 		if (isTriggering || queuedRequests.isEmpty()) {
 			return Optional.empty();
 		}
@@ -156,22 +143,17 @@ class CheckpointRequestDecider {
 	@VisibleForTesting
 	@Deprecated
 	PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-		synchronized (lock) {
-			return new PriorityQueue<>(queuedRequests);
-		}
+		return new PriorityQueue<>(queuedRequests);
 	}
 
 	void abortAll(CheckpointException exception) {
-		Preconditions.checkState(Thread.holdsLock(lock));
 		while (!queuedRequests.isEmpty()) {
 			queuedRequests.pollFirst().completeExceptionally(exception);
 		}
 	}
 
 	int getNumQueuedRequests() {
-		synchronized (lock) {
-			return queuedRequests.size();
-		}
+		return queuedRequests.size();
 	}
 
 	private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 82f5889..eb96a1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -77,7 +78,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -122,6 +125,46 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testMinCheckpointPause() throws Exception {
+		// will use a different thread to allow checkpoint triggering before exiting from receiveAcknowledgeMessage
+		ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+		try {
+			int pause = 1000;
+			JobID jobId = new JobID();
+			ExecutionAttemptID attemptId = new ExecutionAttemptID();
+			ExecutionVertex vertex = mockExecutionVertex(attemptId);
+
+			CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder()
+				.setTimer(new ScheduledExecutorServiceAdapter(executorService))
+					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+						.setCheckpointInterval(pause)
+						.setCheckpointTimeout(Long.MAX_VALUE)
+						.setMaxConcurrentCheckpoints(1)
+						.setMinPauseBetweenCheckpoints(pause)
+						.build())
+					.setTasksToTrigger(new ExecutionVertex[]{vertex})
+					.setTasksToWaitFor(new ExecutionVertex[]{vertex})
+					.setTasksToCommitTo(new ExecutionVertex[]{vertex})
+					.setJobId(jobId)
+				.build();
+			coordinator.startCheckpointScheduler();
+
+			coordinator.triggerCheckpoint(true); // trigger, execute, and later complete by receiveAcknowledgeMessage
+			coordinator.triggerCheckpoint(true); // enqueue and later see if it gets executed in the middle of receiveAcknowledgeMessage
+			while (coordinator.getNumberOfPendingCheckpoints() == 0) { // wait for at least 1 request to be fully processed
+				Thread.sleep(10);
+			}
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
+			Thread.sleep(pause / 2);
+			assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+			Thread.sleep(pause);
+			assertEquals(1, coordinator.getNumberOfPendingCheckpoints());
+		} finally {
+			executorService.shutdownNow();
+		}
+	}
+
+	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
 		try {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 55b19b6..90967b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -169,7 +169,7 @@ public class CheckpointRequestDeciderTest {
 	private void testTiming(CheckpointTriggerRequest request, TriggerExpectation expectation) {
 		final long pause = 10;
 		final ManualClock clock = new ManualClock();
-		final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, new Object(), Integer.MAX_VALUE);
+		final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, Integer.MAX_VALUE);
 
 		final long lastCompletionMs = clock.relativeTimeMillis();
 		final boolean isTriggering = false;
@@ -220,7 +220,7 @@ public class CheckpointRequestDeciderTest {
 	private CheckpointRequestDecider decider(int maxQueued, int maxPending, int minPause, AtomicInteger currentPending) {
 		ManualClock clock = new ManualClock();
 		clock.advanceTime(1, TimeUnit.DAYS);
-		return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, new Object(), maxQueued);
+		return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, maxQueued);
 	}
 
 	private static final Consumer<Long> NO_OP = unused -> {