You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/30 07:48:23 UTC

[seatunnel] branch dev updated: [Improve][Zeta] async execute checkpoint trigger and other block method (#4846)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83fa97bc4 [Improve][Zeta] async execute checkpoint trigger and other block method (#4846)
83fa97bc4 is described below

commit 83fa97bc4afd61db8647602064138bcc700a00b3
Author: Jia Fan <fa...@qq.com>
AuthorDate: Tue May 30 15:48:17 2023 +0800

    [Improve][Zeta] async execute checkpoint trigger and other block method (#4846)
    
    * [Improve][Zeta] async execute checkpoint trigger
    
    * [Bug][Zeta] Fix zeta cannot normally recycle thread belong to abnormal tasks
    
    * [Improve][Zeta] Move `restoreState` add `addSplitsBack` execute by TaskExecuteService
    
    * [Improve][Zeta] Move `receivedReader` execute by TaskExecuteService
---
 config/hazelcast.yaml                              |  2 +-
 .../engine/server/TaskExecutionService.java        | 47 ++++++++++++++++--
 .../server/checkpoint/CheckpointCloseReason.java   |  3 +-
 .../server/checkpoint/CheckpointCoordinator.java   |  6 +++
 .../server/checkpoint/CheckpointManager.java       |  4 ++
 .../CheckpointBarrierTriggerOperation.java         | 25 ++++++----
 .../operation/CheckpointErrorReportOperation.java  | 56 ++++++++++++++++++++++
 .../operation/NotifyTaskRestoreOperation.java      | 32 +++++++++++--
 .../engine/server/dag/physical/PhysicalPlan.java   |  2 +-
 .../seatunnel/engine/server/execution/Task.java    |  4 ++
 .../server/execution/TaskExecutionContext.java     | 10 +++-
 .../serializable/CheckpointDataSerializerHook.java |  5 ++
 .../seatunnel/engine/server/task/AbstractTask.java |  1 +
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  3 +-
 .../group/queue/IntermediateBlockingQueue.java     |  2 +-
 .../operation/checkpoint/BarrierFlowOperation.java | 28 +++++++----
 .../operation/source/RestoredSplitOperation.java   | 16 ++++++-
 .../operation/source/SourceRegisterOperation.java  | 23 +++++++--
 18 files changed, 232 insertions(+), 37 deletions(-)

diff --git a/config/hazelcast.yaml b/config/hazelcast.yaml
index 87f607960..934712a82 100644
--- a/config/hazelcast.yaml
+++ b/config/hazelcast.yaml
@@ -37,5 +37,5 @@ hazelcast:
     hazelcast.invocation.max.retry.count: 20
     hazelcast.tcp.join.port.try.count: 30
     hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 1000
+    hazelcast.operation.generic.thread.count: 50
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 0b95baded..3c735148b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -70,6 +70,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -122,6 +123,10 @@ public class TaskExecutionService implements DynamicMetricsProvider {
             new ConcurrentHashMap<>();
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> finishedExecutionContexts =
             new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<TaskGroupLocation, Map<String, CompletableFuture<?>>>
+            taskAsyncFunctionFuture = new ConcurrentHashMap<>();
+
     private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures =
             new ConcurrentHashMap<>();
     private final SeaTunnelConfig seaTunnelConfig;
@@ -305,7 +310,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
                             .peek(
                                     task -> {
                                         TaskExecutionContext taskExecutionContext =
-                                                new TaskExecutionContext(task, nodeEngine);
+                                                new TaskExecutionContext(task, nodeEngine, this);
                                         task.setTaskExecutionContext(taskExecutionContext);
                                         taskExecutionContextMap.put(
                                                 task.getTaskID(), taskExecutionContext);
@@ -405,6 +410,25 @@ public class TaskExecutionService implements DynamicMetricsProvider {
         }
     }
 
+    public void asyncExecuteFunction(TaskGroupLocation taskGroupLocation, Runnable task) {
+        String id = UUID.randomUUID().toString();
+        logger.fine("accept async execute function from " + taskGroupLocation + " with id " + id);
+        if (!taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
+            taskAsyncFunctionFuture.put(taskGroupLocation, new ConcurrentHashMap<>());
+        }
+        CompletableFuture<?> future = CompletableFuture.runAsync(task, executorService);
+        taskAsyncFunctionFuture.get(taskGroupLocation).put(id, future);
+        future.whenComplete(
+                (r, e) -> {
+                    taskAsyncFunctionFuture.get(taskGroupLocation).remove(id);
+                    logger.fine(
+                            "remove async execute function from "
+                                    + taskGroupLocation
+                                    + " with id "
+                                    + id);
+                });
+    }
+
     public void notifyCleanTaskGroupContext(TaskGroupLocation taskGroupLocation) {
         finishedExecutionContexts.remove(taskGroupLocation);
     }
@@ -750,7 +774,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
                                                     "cancellationFuture should be completed exceptionally");
                                 }
                                 exception(e);
-                                cancelAllTask();
+                                cancelAllTask(taskGroup.getTaskGroupLocation());
                             }));
         }
 
@@ -758,13 +782,27 @@ public class TaskExecutionService implements DynamicMetricsProvider {
             executionException.compareAndSet(null, t);
         }
 
-        private void cancelAllTask() {
+        private void cancelAllTask(TaskGroupLocation taskGroupLocation) {
             try {
                 blockingFutures.forEach(f -> f.cancel(true));
                 currRunningTaskFuture.values().forEach(f -> f.cancel(true));
             } catch (CancellationException ignore) {
                 // ignore
             }
+            cancelAsyncFunction(taskGroupLocation);
+        }
+
+        private void cancelAsyncFunction(TaskGroupLocation taskGroupLocation) {
+            try {
+                if (taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
+                    taskAsyncFunctionFuture.remove(taskGroupLocation).values().stream()
+                            .filter(f -> !f.isDone())
+                            .filter(f -> !f.isCancelled())
+                            .forEach(f -> f.cancel(true));
+                }
+            } catch (CancellationException ignore) {
+                // ignore
+            }
         }
 
         void taskDone(Task task) {
@@ -778,6 +816,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
                 finishedExecutionContexts.put(
                         taskGroupLocation, executionContexts.remove(taskGroupLocation));
                 cancellationFutures.remove(taskGroupLocation);
+                cancelAsyncFunction(taskGroupLocation);
                 if (ex == null) {
                     future.complete(
                             new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED));
@@ -792,7 +831,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
                 }
             }
             if (!isCancel.get() && ex != null) {
-                cancelAllTask();
+                cancelAllTask(taskGroupLocation);
             }
         }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 7962ebc75..ae1af4d41 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -19,7 +19,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 public enum CheckpointCloseReason {
     PIPELINE_END("Pipeline turn to end state."),
-    CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+    CHECKPOINT_EXPIRED(
+            "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml"),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 091fe99be..1b33c4c64 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -279,6 +280,11 @@ public class CheckpointCoordinator {
                 .toArray(InvocationFuture[]::new);
     }
 
+    public void reportCheckpointErrorFromTask(String errorMsg) {
+        handleCoordinatorError(
+                CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR, new SeaTunnelException(errorMsg));
+    }
+
     private void scheduleTriggerPendingCheckpoint(long delayMills) {
         scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 48bc9a454..39d25636c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -174,6 +174,10 @@ public class CheckpointManager {
         return getCheckpointCoordinator(taskLocation.getPipelineId());
     }
 
+    public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String errorMsg) {
+        getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
+    }
+
     private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
         CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
         if (coordinator == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index cd9d54837..8ed7985fd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -34,8 +34,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
 @NoArgsConstructor
 @Slf4j
 public class CheckpointBarrierTriggerOperation extends TaskOperation {
@@ -79,12 +77,23 @@ public class CheckpointBarrierTriggerOperation extends TaskOperation {
                                     .getExecutionContext(taskLocation.getTaskGroupLocation())
                                     .getTaskGroup()
                                     .getTask(taskLocation.getTaskID());
-                    try {
-                        log.debug("CheckpointBarrierTriggerOperation [{}]", taskLocation);
-                        task.triggerBarrier(barrier);
-                    } catch (Exception e) {
-                        sneakyThrow(e);
-                    }
+                    task.getExecutionContext()
+                            .getTaskExecutionService()
+                            .asyncExecuteFunction(
+                                    taskLocation.getTaskGroupLocation(),
+                                    () -> {
+                                        try {
+                                            log.debug(
+                                                    "CheckpointBarrierTriggerOperation [{}]",
+                                                    taskLocation);
+                                            task.triggerBarrier(barrier);
+                                        } catch (Exception e) {
+                                            task.getExecutionContext()
+                                                    .sendToMaster(
+                                                            new CheckpointErrorReportOperation(
+                                                                    taskLocation, e));
+                                        }
+                                    });
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java
new file mode 100644
index 000000000..75ec924b7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint.operation;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
+
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+public class CheckpointErrorReportOperation extends TaskOperation {
+
+    private String errorMsg;
+
+    public CheckpointErrorReportOperation(TaskLocation taskLocation, Throwable e) {
+        super(taskLocation);
+        this.errorMsg = ExceptionUtils.getMessage(e);
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        server.getCoordinatorService()
+                .getJobMaster(taskLocation.getJobId())
+                .getCheckpointManager()
+                .reportCheckpointErrorFromTask(taskLocation, errorMsg);
+    }
+
+    @Override
+    public int getFactoryId() {
+        return CheckpointDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return CheckpointDataSerializerHook.CHECKPOINT_ERROR_REPORT_OPERATOR;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 869252458..c50abad48 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -91,11 +91,33 @@ public class NotifyTaskRestoreOperation extends TaskOperation {
                     Task task = groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
                     try {
                         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-                        Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
-                        log.debug("NotifyTaskRestoreOperation.restoreState " + restoredState);
-                        task.restoreState(restoredState);
-                        log.debug("NotifyTaskRestoreOperation.finished " + restoredState);
-                        Thread.currentThread().setContextClassLoader(classLoader);
+                        task.getExecutionContext()
+                                .getTaskExecutionService()
+                                .asyncExecuteFunction(
+                                        taskLocation.getTaskGroupLocation(),
+                                        () -> {
+                                            Thread.currentThread()
+                                                    .setContextClassLoader(
+                                                            groupContext.getClassLoader());
+                                            try {
+                                                log.debug(
+                                                        "NotifyTaskRestoreOperation.restoreState "
+                                                                + restoredState);
+                                                task.restoreState(restoredState);
+                                                log.debug(
+                                                        "NotifyTaskRestoreOperation.finished "
+                                                                + restoredState);
+                                            } catch (Exception e) {
+                                                task.getExecutionContext()
+                                                        .sendToMaster(
+                                                                new CheckpointErrorReportOperation(
+                                                                        taskLocation, e));
+                                            } finally {
+                                                Thread.currentThread()
+                                                        .setContextClassLoader(classLoader);
+                                            }
+                                        });
+
                     } catch (Exception e) {
                         throw new SeaTunnelException(e);
                     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index f8d59188b..4a81fdf63 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -174,7 +174,7 @@ public class PhysicalPlan {
                         }
 
                         if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
-                            JobStatus jobStatus = JobStatus.FAILING;
+                            JobStatus jobStatus;
                             if (failedPipelineNum.get() > 0) {
                                 jobStatus = JobStatus.FAILING;
                                 updateJobState(jobStatus);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 58afa695d..d2b184e85 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -49,6 +49,10 @@ public interface Task
 
     default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {}
 
+    default TaskExecutionContext getExecutionContext() {
+        return null;
+    }
+
     default void triggerBarrier(Barrier barrier) throws Exception {}
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 50faac151..55249babc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.execution;
 
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
@@ -33,10 +34,13 @@ public class TaskExecutionContext {
 
     private final Task task;
     private final NodeEngineImpl nodeEngine;
+    private final TaskExecutionService taskExecutionService;
 
-    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+    public TaskExecutionContext(
+            Task task, NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService) {
         this.task = task;
         this.nodeEngine = nodeEngine;
+        this.taskExecutionService = taskExecutionService;
     }
 
     public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -61,6 +65,10 @@ public class TaskExecutionContext {
         return (T) task;
     }
 
+    public TaskExecutionService getTaskExecutionService() {
+        return taskExecutionService;
+    }
+
     public HazelcastInstance getInstance() {
         return nodeEngine.getHazelcastInstance();
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
index 4b4287641..3349a1070 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
 import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation;
 import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation;
 import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskStartOperation;
@@ -41,6 +42,8 @@ public final class CheckpointDataSerializerHook implements DataSerializerHook {
     public static final int NOTIFY_TASK_RESTORE_OPERATOR = 5;
     public static final int NOTIFY_TASK_START_OPERATOR = 6;
 
+    public static final int CHECKPOINT_ERROR_REPORT_OPERATOR = 7;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     SeaTunnelFactoryIdConstant.SEATUNNEL_CHECKPOINT_DATA_SERIALIZER_FACTORY,
@@ -73,6 +76,8 @@ public final class CheckpointDataSerializerHook implements DataSerializerHook {
                     return new NotifyTaskRestoreOperation();
                 case NOTIFY_TASK_START_OPERATOR:
                     return new NotifyTaskStartOperation();
+                case CHECKPOINT_ERROR_REPORT_OPERATOR:
+                    return new CheckpointErrorReportOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 3129c0b94..4fc23f34c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -68,6 +68,7 @@ public abstract class AbstractTask implements Task {
         this.executionContext = taskExecutionContext;
     }
 
+    @Override
     public TaskExecutionContext getExecutionContext() {
         return executionContext;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 0346b7313..17860945e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -190,7 +190,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
                                 .getExecutionContext()
                                 .sendToMember(
                                         new BarrierFlowOperation(barrier, committerTaskLocation),
-                                        committerTaskAddress);
+                                        committerTaskAddress)
+                                .join();
                     }
                 }
                 runningTask.ack(barrier);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index c08150c2b..5b2de4c50 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -57,7 +57,7 @@ public class IntermediateBlockingQueue extends AbstractIntermediateQueue<Blockin
 
     @Override
     public void close() throws IOException {
-        // nothing
+        getIntermediateQueue().clear();
     }
 
     private void handleRecord(Record<?> record, ConsumerWithException<Record<?>> consumer)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
index 02b224443..89d3b3805 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.engine.server.task.operation.checkpoint;
 
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
 import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -35,8 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
 @NoArgsConstructor
 @Slf4j
 public class BarrierFlowOperation extends TaskOperation {
@@ -80,13 +78,23 @@ public class BarrierFlowOperation extends TaskOperation {
                                     .getExecutionContext(taskLocation.getTaskGroupLocation())
                                     .getTaskGroup()
                                     .getTask(taskLocation.getTaskID());
-                    try {
-                        log.debug("BarrierFlowOperation [{}]", taskLocation);
-                        task.triggerBarrier(barrier);
-                    } catch (Exception e) {
-                        log.warn(ExceptionUtils.getMessage(e));
-                        sneakyThrow(e);
-                    }
+                    task.getExecutionContext()
+                            .getTaskExecutionService()
+                            .asyncExecuteFunction(
+                                    taskLocation.getTaskGroupLocation(),
+                                    () -> {
+                                        try {
+                                            log.debug(
+                                                    "CheckpointBarrierTriggerOperation [{}]",
+                                                    taskLocation);
+                                            task.triggerBarrier(barrier);
+                                        } catch (Exception e) {
+                                            task.getExecutionContext()
+                                                    .sendToMaster(
+                                                            new CheckpointErrorReportOperation(
+                                                                    taskLocation, e));
+                                        }
+                                    });
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 65abfd7d8..0c9c3d95c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
 import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -95,7 +96,20 @@ public class RestoredSplitOperation extends TaskOperation {
                                     .collect(Collectors.toList());
                     SourceSplitEnumeratorTask<SourceSplit> task =
                             taskExecutionService.getTask(taskLocation);
-                    task.addSplitsBack(deserialize, subtaskIndex);
+                    task.getExecutionContext()
+                            .getTaskExecutionService()
+                            .asyncExecuteFunction(
+                                    taskLocation.getTaskGroupLocation(),
+                                    () -> {
+                                        try {
+                                            task.addSplitsBack(deserialize, subtaskIndex);
+                                        } catch (Exception e) {
+                                            task.getExecutionContext()
+                                                    .sendToMaster(
+                                                            new CheckpointErrorReportOperation(
+                                                                    taskLocation, e));
+                                        }
+                                    });
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 7e0d5ce3a..faf44b4c4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation.source;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
 import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -60,11 +61,27 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
                                     .getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
                                     .getClassLoader();
                     ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
-                    Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             server.getTaskExecutionService().getTask(enumeratorTaskID);
-                    task.receivedReader(readerTaskID, readerAddress);
-                    Thread.currentThread().setContextClassLoader(oldClassLoader);
+                    task.getExecutionContext()
+                            .getTaskExecutionService()
+                            .asyncExecuteFunction(
+                                    enumeratorTaskID.getTaskGroupLocation(),
+                                    () -> {
+                                        try {
+                                            Thread.currentThread()
+                                                    .setContextClassLoader(classLoader);
+                                            task.receivedReader(readerTaskID, readerAddress);
+                                        } catch (Exception e) {
+                                            task.getExecutionContext()
+                                                    .sendToMaster(
+                                                            new CheckpointErrorReportOperation(
+                                                                    enumeratorTaskID, e));
+                                        } finally {
+                                            Thread.currentThread()
+                                                    .setContextClassLoader(oldClassLoader);
+                                        }
+                                    });
                     return null;
                 },
                 new RetryUtils.RetryMaterial(