You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/13 10:31:33 UTC
[flink] 03/03: [hotfix][docs] Add javadoc to
CheckpointRequestDecider
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68d6bf573b562839847f11f3b5128d12aa7a5c67
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:37:05 2020 +0200
[hotfix][docs] Add javadoc to CheckpointRequestDecider
---
.../checkpoint/CheckpointRequestDecider.java | 23 +++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 124dc13..455766b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -38,6 +38,19 @@ import static java.lang.System.identityHashCode;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS;
+/**
+ * Decides whether a {@link CheckpointCoordinator.CheckpointTriggerRequest checkpoint request} should be executed,
+ * dropped or postponed.
+ * Dropped requests are failed immediately.
+ * Postponed requests are enqueued into a queue and can be dequeued later.
+ * <p>
+ * Decision is made according to:<ul>
+ * <li>checkpoint properties (e.g. isForce, isPeriodic)</li>
+ * <li>checkpointing configuration (e.g. max concurrent checkpoints, min pause)</li>
+ * <li>current state (other queued requests, pending checkpoints, last checkpoint completion time)</li>
+ * </ul>
+ * </p>
+ */
@SuppressWarnings("ConstantConditions")
class CheckpointRequestDecider {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointRequestDecider.class);
@@ -85,6 +98,10 @@ class CheckpointRequestDecider {
this.maxQueuedRequests = maxQueuedRequests;
}
+ /**
+ * Submit a new checkpoint request and decide whether it or some other request can be executed.
+ * @return request that should be executed
+ */
Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
@@ -101,6 +118,10 @@ class CheckpointRequestDecider {
}
}
+ /**
+ * Choose one of the queued requests to execute, if any.
+ * @return request that should be executed
+ */
Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
@@ -110,7 +131,7 @@ class CheckpointRequestDecider {
/**
* Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
* current state. Acquires a lock and may update the state.
- * @return request to execute, if any.
+ * @return request that should be executed
*/
private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
if (isTriggering || queuedRequests.isEmpty()) {