You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/13 10:31:33 UTC

[flink] 03/03: [hotfix][docs] Add javadoc to CheckpointRequestDecider

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68d6bf573b562839847f11f3b5128d12aa7a5c67
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:37:05 2020 +0200

    [hotfix][docs] Add javadoc to CheckpointRequestDecider
---
 .../checkpoint/CheckpointRequestDecider.java       | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 124dc13..455766b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -38,6 +38,19 @@ import static java.lang.System.identityHashCode;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS;
 
+/**
+ * Decides whether a {@link CheckpointCoordinator.CheckpointTriggerRequest checkpoint request} should be executed,
+ * dropped or postponed.
+ * Dropped requests are failed immediately.
+ * Postponed requests are enqueued into a queue and can be dequeued later.
+ * <p>
+ * Decision is made according to:<ul>
+ * <li>checkpoint properties (e.g. isForce, isPeriodic)</li>
+ * <li>checkpointing configuration (e.g. max concurrent checkpoints, min pause)</li>
+ * <li>current state (other queued requests, pending checkpoints, last checkpoint completion time)</li>
+ * </ul>
+ * </p>
+ */
 @SuppressWarnings("ConstantConditions")
 class CheckpointRequestDecider {
 	private static final Logger LOG = LoggerFactory.getLogger(CheckpointRequestDecider.class);
@@ -85,6 +98,10 @@ class CheckpointRequestDecider {
 		this.maxQueuedRequests = maxQueuedRequests;
 	}
 
+	/**
+	 * Submit a new checkpoint request and decide whether it or some other request can be executed.
+	 * @return request that should be executed
+	 */
 	Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
 		if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
 			// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
@@ -101,6 +118,10 @@ class CheckpointRequestDecider {
 		}
 	}
 
+	/**
+	 * Choose one of the queued requests to execute, if any.
+	 * @return request that should be executed
+	 */
 	Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
 		Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
 		request.ifPresent(CheckpointRequestDecider::logInQueueTime);
@@ -110,7 +131,7 @@ class CheckpointRequestDecider {
 	/**
 	 * Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
 	 * current state. Acquires a lock and may update the state.
-	 * @return request to execute, if any.
+	 * @return request that should be executed
 	 */
 	private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
 		if (isTriggering || queuedRequests.isEmpty()) {