You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/13 10:31:31 UTC
[flink] 01/03: [FLINK-18856][checkpointing] Synchronize access to
CheckpointCoordinator.lastCheckpointCompletionRelativeTime
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 19468a25a906fd1d8bc18113ee6facd2128d5855
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Aug 12 15:21:52 2020 +0200
[FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime
---
.../runtime/checkpoint/CheckpointCoordinator.java | 29 ++++++++----
.../checkpoint/CheckpointRequestDecider.java | 52 +++++++---------------
.../checkpoint/CheckpointCoordinatorTest.java | 43 ++++++++++++++++++
.../checkpoint/CheckpointRequestDeciderTest.java | 4 +-
4 files changed, 83 insertions(+), 45 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 76f9efd..d7b5cee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -317,8 +317,7 @@ public class CheckpointCoordinator {
this::rescheduleTrigger,
this.clock,
this.minPauseBetweenCheckpoints,
- this.pendingCheckpoints::size,
- this.lock);
+ this.pendingCheckpoints::size);
}
// --------------------------------------------------------------------------------------------
@@ -498,9 +497,7 @@ public class CheckpointCoordinator {
}
CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
- requestDecider
- .chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime)
- .ifPresent(this::startTriggeringCheckpoint);
+ chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
return request.onCompletionPromise;
}
@@ -833,7 +830,19 @@ public class CheckpointCoordinator {
}
private void executeQueuedRequest() {
- requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime).ifPresent(this::startTriggeringCheckpoint);
+ chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint);
+ }
+
+ private Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute() {
+ synchronized (lock) {
+ return requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime);
+ }
+ }
+
+ private Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest request) {
+ synchronized (lock) {
+ return requestDecider.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime);
+ }
}
// --------------------------------------------------------------------------------------------
@@ -1412,7 +1421,9 @@ public class CheckpointCoordinator {
@Deprecated
@VisibleForTesting
PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
- return requestDecider.getTriggerRequestQueue();
+ synchronized (lock) {
+ return requestDecider.getTriggerRequestQueue();
+ }
}
public boolean isTriggering() {
@@ -1548,7 +1559,9 @@ public class CheckpointCoordinator {
}
int getNumQueuedRequests() {
- return requestDecider.getNumQueuedRequests();
+ synchronized (lock) {
+ return requestDecider.getNumQueuedRequests();
+ }
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 6a9f779..124dc13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
-
import java.util.Comparator;
import java.util.NavigableSet;
import java.util.Optional;
@@ -51,8 +49,6 @@ class CheckpointRequestDecider {
private final Clock clock;
private final long minPauseBetweenCheckpoints;
private final Supplier<Integer> pendingCheckpointsSizeSupplier;
- private final Object lock;
- @GuardedBy("lock")
private final NavigableSet<CheckpointTriggerRequest> queuedRequests = new TreeSet<>(checkpointTriggerRequestsComparator());
private final int maxQueuedRequests;
@@ -61,15 +57,13 @@ class CheckpointRequestDecider {
Consumer<Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
- Supplier<Integer> pendingCheckpointsSizeSupplier,
- Object lock) {
+ Supplier<Integer> pendingCheckpointsSizeSupplier) {
this(
maxConcurrentCheckpointAttempts,
rescheduleTrigger,
clock,
minPauseBetweenCheckpoints,
pendingCheckpointsSizeSupplier,
- lock,
DEFAULT_MAX_QUEUED_REQUESTS
);
}
@@ -80,7 +74,6 @@ class CheckpointRequestDecider {
Clock clock,
long minPauseBetweenCheckpoints,
Supplier<Integer> pendingCheckpointsSizeSupplier,
- Object lock,
int maxQueuedRequests) {
Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0);
Preconditions.checkArgument(maxQueuedRequests > 0);
@@ -89,43 +82,37 @@ class CheckpointRequestDecider {
this.clock = clock;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier;
- this.lock = lock;
this.maxQueuedRequests = maxQueuedRequests;
}
Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
- synchronized (lock) {
- if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
- // there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
- newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
- return Optional.empty();
- } else {
- queuedRequests.add(newRequest);
- if (queuedRequests.size() > maxQueuedRequests) {
- queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
- }
- Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
- request.ifPresent(CheckpointRequestDecider::logInQueueTime);
- return request;
+ if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
+ // there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
+ newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
+ return Optional.empty();
+ } else {
+ queuedRequests.add(newRequest);
+ if (queuedRequests.size() > maxQueuedRequests) {
+ queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
}
- }
- }
-
- Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
- synchronized (lock) {
Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
return request;
}
}
+ Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
+ Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
+ request.ifPresent(CheckpointRequestDecider::logInQueueTime);
+ return request;
+ }
+
/**
* Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
* current state. Acquires a lock and may update the state.
* @return request to execute, if any.
*/
private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
- Preconditions.checkState(Thread.holdsLock(lock));
if (isTriggering || queuedRequests.isEmpty()) {
return Optional.empty();
}
@@ -156,22 +143,17 @@ class CheckpointRequestDecider {
@VisibleForTesting
@Deprecated
PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
- synchronized (lock) {
- return new PriorityQueue<>(queuedRequests);
- }
+ return new PriorityQueue<>(queuedRequests);
}
void abortAll(CheckpointException exception) {
- Preconditions.checkState(Thread.holdsLock(lock));
while (!queuedRequests.isEmpty()) {
queuedRequests.pollFirst().completeExceptionally(exception);
}
}
int getNumQueuedRequests() {
- synchronized (lock) {
- return queuedRequests.size();
- }
+ return queuedRequests.size();
}
private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 82f5889..eb96a1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -77,7 +78,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -122,6 +125,46 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
+ public void testMinCheckpointPause() throws Exception {
+ // will use a different thread to allow checkpoint triggering before exiting from receiveAcknowledgeMessage
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ try {
+ int pause = 1000;
+ JobID jobId = new JobID();
+ ExecutionAttemptID attemptId = new ExecutionAttemptID();
+ ExecutionVertex vertex = mockExecutionVertex(attemptId);
+
+ CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder()
+ .setTimer(new ScheduledExecutorServiceAdapter(executorService))
+ .setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(pause)
+ .setCheckpointTimeout(Long.MAX_VALUE)
+ .setMaxConcurrentCheckpoints(1)
+ .setMinPauseBetweenCheckpoints(pause)
+ .build())
+ .setTasksToTrigger(new ExecutionVertex[]{vertex})
+ .setTasksToWaitFor(new ExecutionVertex[]{vertex})
+ .setTasksToCommitTo(new ExecutionVertex[]{vertex})
+ .setJobId(jobId)
+ .build();
+ coordinator.startCheckpointScheduler();
+
+ coordinator.triggerCheckpoint(true); // trigger, execute, and later complete by receiveAcknowledgeMessage
+ coordinator.triggerCheckpoint(true); // enqueue and later see if it gets executed in the middle of receiveAcknowledgeMessage
+ while (coordinator.getNumberOfPendingCheckpoints() == 0) { // wait for at least 1 request to be fully processed
+ Thread.sleep(10);
+ }
+ coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
+ Thread.sleep(pause / 2);
+ assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+ Thread.sleep(pause);
+ assertEquals(1, coordinator.getNumberOfPendingCheckpoints());
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Test
public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 55b19b6..90967b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -169,7 +169,7 @@ public class CheckpointRequestDeciderTest {
private void testTiming(CheckpointTriggerRequest request, TriggerExpectation expectation) {
final long pause = 10;
final ManualClock clock = new ManualClock();
- final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, new Object(), Integer.MAX_VALUE);
+ final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, Integer.MAX_VALUE);
final long lastCompletionMs = clock.relativeTimeMillis();
final boolean isTriggering = false;
@@ -220,7 +220,7 @@ public class CheckpointRequestDeciderTest {
private CheckpointRequestDecider decider(int maxQueued, int maxPending, int minPause, AtomicInteger currentPending) {
ManualClock clock = new ManualClock();
clock.advanceTime(1, TimeUnit.DAYS);
- return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, new Object(), maxQueued);
+ return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, maxQueued);
}
private static final Consumer<Long> NO_OP = unused -> {