You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/13 10:31:30 UTC

[flink] branch release-1.11 updated (3549001 -> 68d6bf5)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3549001  [FLINK-18815] Close safety net guarded closeable iff it is still registered
     new 19468a2  [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime
     new 35cf138  [hotfix][test] Fix formatting in CheckpointRequestDeciderTest
     new 68d6bf5  [hotfix][docs] Add javadoc to CheckpointRequestDecider

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  | 29 ++++++---
 .../checkpoint/CheckpointRequestDecider.java       | 75 +++++++++++-----------
 .../checkpoint/CheckpointCoordinatorTest.java      | 43 +++++++++++++
 .../checkpoint/CheckpointRequestDeciderTest.java   |  8 +--
 4 files changed, 105 insertions(+), 50 deletions(-)


[flink] 02/03: [hotfix][test] Fix formatting in CheckpointRequestDeciderTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35cf138d30df7809749ab072d15427a7452d8132
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sat Aug 8 12:53:27 2020 +0200

    [hotfix][test] Fix formatting in CheckpointRequestDeciderTest
---
 .../apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 90967b9..f8f3270 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -66,7 +66,6 @@ public class CheckpointRequestDeciderTest {
 		assertEquals(Optional.of(request), decider.chooseQueuedRequestToExecute(isTriggering, 0));
 	}
 
-
 	@Test
 	public void testNonForcedEnqueueOnTooManyPending() {
 		final int maxPending = 1;
@@ -99,7 +98,6 @@ public class CheckpointRequestDeciderTest {
 				new CheckpointTriggerRequest[]{savepoint, checkpoint});
 	}
 
-
 	@Test
 	public void testNonForcedUserSubmittedPrioritized() {
 		CheckpointTriggerRequest userSubmitted = nonForcedSavepoint();
@@ -118,8 +116,6 @@ public class CheckpointRequestDeciderTest {
 			new CheckpointTriggerRequest[]{savepoint, checkpoint});
 	}
 
-
-
 	@Test
 	public void testQueueSizeLimit() {
 		final int maxQueuedRequests = 10;


[flink] 03/03: [hotfix][docs] Add javadoc to CheckpointRequestDecider

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68d6bf573b562839847f11f3b5128d12aa7a5c67
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:37:05 2020 +0200

    [hotfix][docs] Add javadoc to CheckpointRequestDecider
---
 .../checkpoint/CheckpointRequestDecider.java       | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 124dc13..455766b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -38,6 +38,19 @@ import static java.lang.System.identityHashCode;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS;
 
+/**
+ * Decides whether a {@link CheckpointCoordinator.CheckpointTriggerRequest checkpoint request} should be executed,
+ * dropped or postponed.
+ * Dropped requests are failed immediately.
+ * Postponed requests are enqueued into a queue and can be dequeued later.
+ * <p>
+ * Decision is made according to:<ul>
+ * <li>checkpoint properties (e.g. isForce, isPeriodic)</li>
+ * <li>checkpointing configuration (e.g. max concurrent checkpoints, min pause)</li>
+ * <li>current state (other queued requests, pending checkpoints, last checkpoint completion time)</li>
+ * </ul>
+ * </p>
+ */
 @SuppressWarnings("ConstantConditions")
 class CheckpointRequestDecider {
 	private static final Logger LOG = LoggerFactory.getLogger(CheckpointRequestDecider.class);
@@ -85,6 +98,10 @@ class CheckpointRequestDecider {
 		this.maxQueuedRequests = maxQueuedRequests;
 	}
 
+	/**
+	 * Submit a new checkpoint request and decide whether it or some other request can be executed.
+	 * @return request that should be executed
+	 */
 	Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
 		if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
 			// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
@@ -101,6 +118,10 @@ class CheckpointRequestDecider {
 		}
 	}
 
+	/**
+	 * Choose one of the queued requests to execute, if any.
+	 * @return request that should be executed
+	 */
 	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
 		Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
 		request.ifPresent(CheckpointRequestDecider::logInQueueTime);
@@ -110,7 +131,7 @@ class CheckpointRequestDecider {
 	/**
 	 * Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
 	 * current state. Acquires a lock and may update the state.
-	 * @return request to execute, if any.
+	 * @return request that should be executed
 	 */
 	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
 		if (isTriggering || queuedRequests.isEmpty()) {


[flink] 01/03: [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19468a25a906fd1d8bc18113ee6facd2128d5855
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:21:52 2020 +0200

    [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 29 ++++++++----
 .../checkpoint/CheckpointRequestDecider.java       | 52 +++++++---------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 43 ++++++++++++++++++
 .../checkpoint/CheckpointRequestDeciderTest.java   |  4 +-
 4 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 76f9efd..d7b5cee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -317,8 +317,7 @@ public class CheckpointCoordinator {
 			this::rescheduleTrigger,
 			this.clock,
 			this.minPauseBetweenCheckpoints,
-			this.pendingCheckpoints::size,
-			this.lock);
+			this.pendingCheckpoints::size);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -498,9 +497,7 @@ public class CheckpointCoordinator {
 		}
 
 		CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
-		requestDecider
-			.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime)
-			.ifPresent(this::startTriggeringCheckpoint);
+		chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
 		return request.onCompletionPromise;
 	}
 
@@ -833,7 +830,19 @@ public class CheckpointCoordinator {
 	}
 
 	private void executeQueuedRequest() {
-		requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime).ifPresent(this::startTriggeringCheckpoint);
+		chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint);
+	}
+
+	private Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute() {
+		synchronized (lock) {
+			return requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime);
+		}
+	}
+
+	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest request) {
+		synchronized (lock) {
+			return requestDecider.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1412,7 +1421,9 @@ public class CheckpointCoordinator {
 	@Deprecated
 	@VisibleForTesting
 	PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-		return requestDecider.getTriggerRequestQueue();
+		synchronized (lock) {
+			return requestDecider.getTriggerRequestQueue();
+		}
 	}
 
 	public boolean isTriggering() {
@@ -1548,7 +1559,9 @@ public class CheckpointCoordinator {
 	}
 
 	int getNumQueuedRequests() {
-		return requestDecider.getNumQueuedRequests();
+		synchronized (lock) {
+			return requestDecider.getNumQueuedRequests();
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 6a9f779..124dc13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.util.Comparator;
 import java.util.NavigableSet;
 import java.util.Optional;
@@ -51,8 +49,6 @@ class CheckpointRequestDecider {
 	private final Clock clock;
 	private final long minPauseBetweenCheckpoints;
 	private final Supplier<Integer> pendingCheckpointsSizeSupplier;
-	private final Object lock;
-	@GuardedBy("lock")
 	private final NavigableSet<CheckpointTriggerRequest> queuedRequests = new TreeSet<>(checkpointTriggerRequestsComparator());
 	private final int maxQueuedRequests;
 
@@ -61,15 +57,13 @@ class CheckpointRequestDecider {
 			Consumer<Long> rescheduleTrigger,
 			Clock clock,
 			long minPauseBetweenCheckpoints,
-			Supplier<Integer> pendingCheckpointsSizeSupplier,
-			Object lock) {
+			Supplier<Integer> pendingCheckpointsSizeSupplier) {
 		this(
 			maxConcurrentCheckpointAttempts,
 			rescheduleTrigger,
 			clock,
 			minPauseBetweenCheckpoints,
 			pendingCheckpointsSizeSupplier,
-			lock,
 			DEFAULT_MAX_QUEUED_REQUESTS
 		);
 	}
@@ -80,7 +74,6 @@ class CheckpointRequestDecider {
 			Clock clock,
 			long minPauseBetweenCheckpoints,
 			Supplier<Integer> pendingCheckpointsSizeSupplier,
-			Object lock,
 			int maxQueuedRequests) {
 		Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0);
 		Preconditions.checkArgument(maxQueuedRequests > 0);
@@ -89,43 +82,37 @@ class CheckpointRequestDecider {
 		this.clock = clock;
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier;
-		this.lock = lock;
 		this.maxQueuedRequests = maxQueuedRequests;
 	}
 
 	Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
-		synchronized (lock) {
-			if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
-				// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
-				newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-				return Optional.empty();
-			} else {
-				queuedRequests.add(newRequest);
-				if (queuedRequests.size() > maxQueuedRequests) {
-					queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-				}
-				Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
-				request.ifPresent(CheckpointRequestDecider::logInQueueTime);
-				return request;
+		if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
+			// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
+			newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
+			return Optional.empty();
+		} else {
+			queuedRequests.add(newRequest);
+			if (queuedRequests.size() > maxQueuedRequests) {
+				queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
 			}
-		}
-	}
-
-	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
-		synchronized (lock) {
 			Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
 			request.ifPresent(CheckpointRequestDecider::logInQueueTime);
 			return request;
 		}
 	}
 
+	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
+		Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
+		request.ifPresent(CheckpointRequestDecider::logInQueueTime);
+		return request;
+	}
+
 	/**
 	 * Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
 	 * current state. Acquires a lock and may update the state.
 	 * @return request to execute, if any.
 	 */
 	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
-		Preconditions.checkState(Thread.holdsLock(lock));
 		if (isTriggering || queuedRequests.isEmpty()) {
 			return Optional.empty();
 		}
@@ -156,22 +143,17 @@ class CheckpointRequestDecider {
 	@VisibleForTesting
 	@Deprecated
 	PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-		synchronized (lock) {
-			return new PriorityQueue<>(queuedRequests);
-		}
+		return new PriorityQueue<>(queuedRequests);
 	}
 
 	void abortAll(CheckpointException exception) {
-		Preconditions.checkState(Thread.holdsLock(lock));
 		while (!queuedRequests.isEmpty()) {
 			queuedRequests.pollFirst().completeExceptionally(exception);
 		}
 	}
 
 	int getNumQueuedRequests() {
-		synchronized (lock) {
-			return queuedRequests.size();
-		}
+		return queuedRequests.size();
 	}
 
 	private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 82f5889..eb96a1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -77,7 +78,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -122,6 +125,46 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testMinCheckpointPause() throws Exception {
+		// will use a different thread to allow checkpoint triggering before exiting from receiveAcknowledgeMessage
+		ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+		try {
+			int pause = 1000;
+			JobID jobId = new JobID();
+			ExecutionAttemptID attemptId = new ExecutionAttemptID();
+			ExecutionVertex vertex = mockExecutionVertex(attemptId);
+
+			CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder()
+				.setTimer(new ScheduledExecutorServiceAdapter(executorService))
+					.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+						.setCheckpointInterval(pause)
+						.setCheckpointTimeout(Long.MAX_VALUE)
+						.setMaxConcurrentCheckpoints(1)
+						.setMinPauseBetweenCheckpoints(pause)
+						.build())
+					.setTasksToTrigger(new ExecutionVertex[]{vertex})
+					.setTasksToWaitFor(new ExecutionVertex[]{vertex})
+					.setTasksToCommitTo(new ExecutionVertex[]{vertex})
+					.setJobId(jobId)
+				.build();
+			coordinator.startCheckpointScheduler();
+
+			coordinator.triggerCheckpoint(true); // trigger, execute, and later complete by receiveAcknowledgeMessage
+			coordinator.triggerCheckpoint(true); // enqueue and later see if it gets executed in the middle of receiveAcknowledgeMessage
+			while (coordinator.getNumberOfPendingCheckpoints() == 0) { // wait for at least 1 request to be fully processed
+				Thread.sleep(10);
+			}
+			coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
+			Thread.sleep(pause / 2);
+			assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+			Thread.sleep(pause);
+			assertEquals(1, coordinator.getNumberOfPendingCheckpoints());
+		} finally {
+			executorService.shutdownNow();
+		}
+	}
+
+	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
 		try {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 55b19b6..90967b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -169,7 +169,7 @@ public class CheckpointRequestDeciderTest {
 	private void testTiming(CheckpointTriggerRequest request, TriggerExpectation expectation) {
 		final long pause = 10;
 		final ManualClock clock = new ManualClock();
-		final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, new Object(), Integer.MAX_VALUE);
+		final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, Integer.MAX_VALUE);
 
 		final long lastCompletionMs = clock.relativeTimeMillis();
 		final boolean isTriggering = false;
@@ -220,7 +220,7 @@ public class CheckpointRequestDeciderTest {
 	private CheckpointRequestDecider decider(int maxQueued, int maxPending, int minPause, AtomicInteger currentPending) {
 		ManualClock clock = new ManualClock();
 		clock.advanceTime(1, TimeUnit.DAYS);
-		return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, new Object(), maxQueued);
+		return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, maxQueued);
 	}
 
 	private static final Consumer<Long> NO_OP = unused -> {