You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/06 08:55:53 UTC

[incubator-seatunnel] branch dev updated: [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e05d6c8d9 [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)
e05d6c8d9 is described below

commit e05d6c8d93c250d9a2caf3f2fe70f0b26f3f6eec
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Mar 6 16:55:42 2023 +0800

    [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task (#4276)
    
    * [Fix] Fix CheckpointCoordinator Can't Trigger Timeout Task
---
 .../server/checkpoint/CheckpointCoordinator.java   | 74 +++++++++-------------
 .../server/checkpoint/PendingCheckpoint.java       |  9 +++
 2 files changed, 40 insertions(+), 43 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 260f09582..84054ac6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -152,7 +151,7 @@ public class CheckpointCoordinator {
                 new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
         this.scheduler =
                 Executors.newScheduledThreadPool(
-                        1,
+                        2,
                         runnable -> {
                             Thread thread = new Thread(runnable);
                             thread.setDaemon(true);
@@ -274,11 +273,14 @@ public class CheckpointCoordinator {
     }
 
     private void scheduleTriggerPendingCheckpoint(long delayMills) {
-        scheduler.schedule(() -> tryTriggerPendingCheckpoint(), delayMills, TimeUnit.MILLISECONDS);
+        scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
     }
 
-    protected void tryTriggerPendingCheckpoint() {
-        tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
+    private void scheduleTriggerPendingCheckpoint(CheckpointType checkpointType, long delayMills) {
+        scheduler.schedule(
+                () -> tryTriggerPendingCheckpoint(checkpointType),
+                delayMills,
+                TimeUnit.MILLISECONDS);
     }
 
     protected void readyToClose(TaskLocation taskLocation) {
@@ -295,18 +297,27 @@ public class CheckpointCoordinator {
         shutdown = false;
         if (alreadyStarted) {
             isAllTaskReady = true;
-            tryTriggerPendingCheckpoint();
+            tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
         } else {
             isAllTaskReady = false;
         }
     }
 
     protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
-        synchronized (lock) {
-            if (Thread.currentThread().isInterrupted()) {
-                LOG.warn("currentThread already be interrupted, skip trigger checkpoint");
+        if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("currentThread already be interrupted, skip trigger checkpoint");
+            return;
+        }
+        final long currentTimestamp = Instant.now().toEpochMilli();
+        if (notFinalCheckpoint(checkpointType)) {
+            if (currentTimestamp - latestTriggerTimestamp.get()
+                            < coordinatorConfig.getCheckpointInterval()
+                    || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
+                    || !isAllTaskReady) {
                 return;
             }
+        }
+        synchronized (lock) {
             if (isCompleted() || isShutdown()) {
                 LOG.warn(
                         String.format(
@@ -318,16 +329,11 @@ public class CheckpointCoordinator {
                                 shutdown));
                 return;
             }
-            final long currentTimestamp = Instant.now().toEpochMilli();
-            if (notFinalCheckpoint(checkpointType)) {
-                if (currentTimestamp - latestTriggerTimestamp.get()
-                                < coordinatorConfig.getCheckpointInterval()
-                        || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
-                        || !isAllTaskReady) {
+            if (!notFinalCheckpoint(checkpointType)) {
+                if (pendingCounter.get() > 0) {
+                    scheduleTriggerPendingCheckpoint(checkpointType, 500L);
                     return;
                 }
-            } else {
-                waitingPendingCheckpointDone();
             }
             CompletableFuture<PendingCheckpoint> pendingCheckpoint =
                     createPendingCheckpoint(currentTimestamp, checkpointType);
@@ -344,24 +350,6 @@ public class CheckpointCoordinator {
         return checkpointType.equals(CHECKPOINT_TYPE);
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    private void waitingPendingCheckpointDone() {
-        while (pendingCounter.get() != 0) {
-            try {
-                LOG.info(
-                        "waiting pending checkpoint completed, pending counter: "
-                                + pendingCounter.get());
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw new SeaTunnelEngineException(e);
-            }
-        }
-    }
-
-    private boolean canTriggered() {
-        return !isCompleted() && !isShutdown();
-    }
-
     public boolean isShutdown() {
         return shutdown;
     }
@@ -411,12 +399,7 @@ public class CheckpointCoordinator {
                             });
 
                     // Trigger the barrier and wait for all tasks to ACK
-                    LOG.debug(
-                            "trigger checkpoint barrier {}/{}/{}, {}",
-                            pendingCheckpoint.getJobId(),
-                            pendingCheckpoint.getPipelineId(),
-                            pendingCheckpoint.getCheckpointId(),
-                            pendingCheckpoint.getCheckpointType());
+                    LOG.debug("trigger checkpoint barrier {}", pendingCheckpoint.getInfo());
                     CompletableFuture<InvocationFuture<?>[]> completableFutureArray =
                             CompletableFuture.supplyAsync(
                                             () ->
@@ -437,7 +420,9 @@ public class CheckpointCoordinator {
                         return;
                     }
 
-                    LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
+                    LOG.debug(
+                            "Start a scheduled task to prevent checkpoint timeouts for barrier "
+                                    + pendingCheckpoint.getInfo());
                     scheduler.schedule(
                             () -> {
                                 // If any task is not acked within the checkpoint timeout
@@ -445,6 +430,9 @@ public class CheckpointCoordinator {
                                                 != null
                                         && !pendingCheckpoint.isFullyAcknowledged()) {
                                     if (tolerableFailureCheckpoints-- <= 0) {
+                                        LOG.debug(
+                                                "timeout checkpoint: "
+                                                        + pendingCheckpoint.getInfo());
                                         handleCoordinatorError(
                                                 CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
                                     }
@@ -616,7 +604,7 @@ public class CheckpointCoordinator {
         if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
             // latest checkpoint completed time > checkpoint interval
             if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
-                tryTriggerPendingCheckpoint();
+                scheduleTriggerPendingCheckpoint(0L);
             }
         }
         completedCheckpoints.addLast(completedCheckpoint);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 4bb06bcef..dcdccdacf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -172,4 +172,13 @@ public class PendingCheckpoint implements Checkpoint {
             completableFuture.completeExceptionally(failureCause);
         }
     }
+
+    public String getInfo() {
+        return String.format(
+                "%s/%s/%s, %s",
+                this.getJobId(),
+                this.getPipelineId(),
+                this.getCheckpointId(),
+                this.getCheckpointType());
+    }
 }