You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/06 08:55:53 UTC
[incubator-seatunnel] branch dev updated: [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e05d6c8d9 [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)
e05d6c8d9 is described below
commit e05d6c8d93c250d9a2caf3f2fe70f0b26f3f6eec
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Mar 6 16:55:42 2023 +0800
[Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)
* [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task
---
.../server/checkpoint/CheckpointCoordinator.java | 74 +++++++++-------------
.../server/checkpoint/PendingCheckpoint.java | 9 +++
2 files changed, 40 insertions(+), 43 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 260f09582..84054ac6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -152,7 +151,7 @@ public class CheckpointCoordinator {
new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
this.scheduler =
Executors.newScheduledThreadPool(
- 1,
+ 2,
runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
@@ -274,11 +273,14 @@ public class CheckpointCoordinator {
}
private void scheduleTriggerPendingCheckpoint(long delayMills) {
- scheduler.schedule(() -> tryTriggerPendingCheckpoint(), delayMills, TimeUnit.MILLISECONDS);
+ scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
}
- protected void tryTriggerPendingCheckpoint() {
- tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
+ private void scheduleTriggerPendingCheckpoint(CheckpointType checkpointType, long delayMills) {
+ scheduler.schedule(
+ () -> tryTriggerPendingCheckpoint(checkpointType),
+ delayMills,
+ TimeUnit.MILLISECONDS);
}
protected void readyToClose(TaskLocation taskLocation) {
@@ -295,18 +297,27 @@ public class CheckpointCoordinator {
shutdown = false;
if (alreadyStarted) {
isAllTaskReady = true;
- tryTriggerPendingCheckpoint();
+ tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
} else {
isAllTaskReady = false;
}
}
protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
- synchronized (lock) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.warn("currentThread already be interrupted, skip trigger checkpoint");
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("currentThread already be interrupted, skip trigger checkpoint");
+ return;
+ }
+ final long currentTimestamp = Instant.now().toEpochMilli();
+ if (notFinalCheckpoint(checkpointType)) {
+ if (currentTimestamp - latestTriggerTimestamp.get()
+ < coordinatorConfig.getCheckpointInterval()
+ || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
+ || !isAllTaskReady) {
return;
}
+ }
+ synchronized (lock) {
if (isCompleted() || isShutdown()) {
LOG.warn(
String.format(
@@ -318,16 +329,11 @@ public class CheckpointCoordinator {
shutdown));
return;
}
- final long currentTimestamp = Instant.now().toEpochMilli();
- if (notFinalCheckpoint(checkpointType)) {
- if (currentTimestamp - latestTriggerTimestamp.get()
- < coordinatorConfig.getCheckpointInterval()
- || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
- || !isAllTaskReady) {
+ if (!notFinalCheckpoint(checkpointType)) {
+ if (pendingCounter.get() > 0) {
+ scheduleTriggerPendingCheckpoint(checkpointType, 500L);
return;
}
- } else {
- waitingPendingCheckpointDone();
}
CompletableFuture<PendingCheckpoint> pendingCheckpoint =
createPendingCheckpoint(currentTimestamp, checkpointType);
@@ -344,24 +350,6 @@ public class CheckpointCoordinator {
return checkpointType.equals(CHECKPOINT_TYPE);
}
- @SuppressWarnings("checkstyle:MagicNumber")
- private void waitingPendingCheckpointDone() {
- while (pendingCounter.get() != 0) {
- try {
- LOG.info(
- "waiting pending checkpoint completed, pending counter: "
- + pendingCounter.get());
- Thread.sleep(500);
- } catch (InterruptedException e) {
- throw new SeaTunnelEngineException(e);
- }
- }
- }
-
- private boolean canTriggered() {
- return !isCompleted() && !isShutdown();
- }
-
public boolean isShutdown() {
return shutdown;
}
@@ -411,12 +399,7 @@ public class CheckpointCoordinator {
});
// Trigger the barrier and wait for all tasks to ACK
- LOG.debug(
- "trigger checkpoint barrier {}/{}/{}, {}",
- pendingCheckpoint.getJobId(),
- pendingCheckpoint.getPipelineId(),
- pendingCheckpoint.getCheckpointId(),
- pendingCheckpoint.getCheckpointType());
+ LOG.debug("trigger checkpoint barrier {}", pendingCheckpoint.getInfo());
CompletableFuture<InvocationFuture<?>[]> completableFutureArray =
CompletableFuture.supplyAsync(
() ->
@@ -437,7 +420,9 @@ public class CheckpointCoordinator {
return;
}
- LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
+ LOG.debug(
+ "Start a scheduled task to prevent checkpoint timeouts for barrier "
+ + pendingCheckpoint.getInfo());
scheduler.schedule(
() -> {
// If any task is not acked within the checkpoint timeout
@@ -445,6 +430,9 @@ public class CheckpointCoordinator {
!= null
&& !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
+ LOG.debug(
+ "timeout checkpoint: "
+ + pendingCheckpoint.getInfo());
handleCoordinatorError(
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
}
@@ -616,7 +604,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
- tryTriggerPendingCheckpoint();
+ scheduleTriggerPendingCheckpoint(0L);
}
}
completedCheckpoints.addLast(completedCheckpoint);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 4bb06bcef..dcdccdacf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -172,4 +172,13 @@ public class PendingCheckpoint implements Checkpoint {
completableFuture.completeExceptionally(failureCause);
}
}
+
+ public String getInfo() {
+ return String.format(
+ "%s/%s/%s, %s",
+ this.getJobId(),
+ this.getPipelineId(),
+ this.getCheckpointId(),
+ this.getCheckpointType());
+ }
}