You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/05 07:57:02 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new b830741c7 [Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)
b830741c7 is described below

commit b830741c747fa04fb1f5bc9f08d6ca9fa9260572
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Sep 5 15:56:55 2022 +0800

    [Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)
    
    * [Engine][Task] Move SourceSplitEnumeratorTask to worker node
---
 .../apache/seatunnel/common/utils/RetryUtils.java  |  4 +-
 seatunnel-engine/seatunnel-engine-client/pom.xml   |  1 -
 seatunnel-engine/seatunnel-engine-server/pom.xml   |  1 -
 .../engine/server/TaskExecutionService.java        |  9 +---
 .../engine/server/dag/physical/PhysicalVertex.java | 25 +++++----
 .../server/execution/TaskExecutionContext.java     |  9 +---
 .../seatunnel/engine/server/master/JobMaster.java  | 28 +++++++---
 .../engine/server/scheduler/JobScheduler.java      |  7 ++-
 .../server/scheduler/PipelineBaseScheduler.java    | 63 ++++++++++++----------
 .../serializable/TaskDataSerializerHook.java       |  6 +++
 .../server/service/slot/DefaultSlotService.java    |  7 ++-
 .../engine/server/service/slot/SlotContext.java    |  1 -
 .../engine/server/service/slot/SlotService.java    |  2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java | 28 +++++++---
 .../server/task/flow/SourceFlowLifeCycle.java      | 36 ++++++++-----
 .../server/task/operation/DeployTaskOperation.java |  4 +-
 ...tion.java => GetTaskGroupAddressOperation.java} | 60 +++++++++++++--------
 17 files changed, 180 insertions(+), 111 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index f6ac9a694..55492a4e4 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -34,12 +34,14 @@ public class RetryUtils {
         if (retryMaterial.getRetryTimes() < 0) {
             throw new IllegalArgumentException("Retry times must be greater than 0");
         }
+        Exception lastE;
         int i = 0;
         do {
             i++;
             try {
                 return execution.execute();
             } catch (Exception e) {
+                lastE = e;
                 if (retryCondition != null && !retryCondition.canRetry(e)) {
                     if (retryMaterial.shouldThrowException()) {
                         throw e;
@@ -50,7 +52,7 @@ public class RetryUtils {
             }
         } while (i <= retryTimes);
         if (retryMaterial.shouldThrowException()) {
-            throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times");
+            throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times", lastE);
         }
         return null;
     }
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 33fd88a8c..b24ad2088 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -59,7 +59,6 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>checkpoint-storage-local-file</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
index c17fb0bbd..d02bcf67c 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -43,7 +43,6 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>checkpoint-storage-local-file</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.hazelcast</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 82a6bbf52..81875a852 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -39,7 +39,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
-import org.apache.seatunnel.engine.server.service.slot.SlotContext;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
 import com.google.common.collect.Lists;
@@ -85,7 +84,6 @@ public class TaskExecutionService {
     // key: TaskID
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
     private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
-    private SlotContext slotContext;
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -102,10 +100,6 @@ public class TaskExecutionService {
         executorService.shutdownNow();
     }
 
-    public void setSlotContext(SlotContext slotContext) {
-        this.slotContext = slotContext;
-    }
-
     public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
         return executionContexts.get(taskGroupLocation);
     }
@@ -186,8 +180,7 @@ public class TaskExecutionService {
             final Map<Boolean, List<Task>> byCooperation =
                 tasks.stream()
                     .peek(task -> {
-                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine,
-                                slotContext);
+                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine);
                         task.setTaskExecutionContext(taskExecutionContext);
                         taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
                     })
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 08ab71c15..1e8dc0ecf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -144,26 +144,22 @@ public class PhysicalVertex {
                 subTaskGroupIndex + 1,
                 parallelism);
         this.taskFuture = new CompletableFuture<>();
-        this.taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, physicalVertexId);
+        this.taskGroupLocation = taskGroup.getTaskGroupLocation();
     }
 
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
         return new PassiveCompletableFuture<>(this.taskFuture);
     }
 
-    public void deployOnMaster() {
-        currentExecutionAddress = nodeEngine.getMasterAddress();
+    private void deployOnLocal(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> {
             SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-            return new PassiveCompletableFuture<>(server.getTaskExecutionService()
-                .deployTask(taskGroupImmutableInformation));
+            return new PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
+                .getTaskExecutionService().deployTask(taskGroupImmutableInformation));
         });
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    // This method must not throw an exception
-    public void deploy(@NonNull SlotProfile slotProfile) {
-        currentExecutionAddress = slotProfile.getWorker();
+    private void deployOnRemote(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
             nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
                     new DeployTaskOperation(slotProfile,
@@ -172,6 +168,17 @@ public class PhysicalVertex {
                 .invoke()));
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    // This method must not throw an exception
+    public void deploy(@NonNull SlotProfile slotProfile) {
+        currentExecutionAddress = slotProfile.getWorker();
+        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+            deployOnLocal(slotProfile);
+        } else {
+            deployOnRemote(slotProfile);
+        }
+    }
+
     private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
         TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
         PassiveCompletableFuture<TaskExecutionState> completeFuture;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 5b8add35c..4ba8d075f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import org.apache.seatunnel.engine.server.service.slot.SlotContext;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
@@ -31,12 +30,9 @@ public class TaskExecutionContext {
     private final Task task;
     private final NodeEngineImpl nodeEngine;
 
-    private final SlotContext slotContext;
-
-    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine, SlotContext slotContext) {
+    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
         this.task = task;
         this.nodeEngine = nodeEngine;
-        this.slotContext = slotContext;
     }
 
     public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -55,7 +51,4 @@ public class TaskExecutionContext {
         return (T) task;
     }
 
-    public SlotContext getSlotContext() {
-        return slotContext;
-    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 942dc8413..98d865f55 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,11 +29,13 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
-import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 
+import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.datamodel.Tuple2;
@@ -45,7 +47,9 @@ import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 public class JobMaster implements Runnable {
@@ -69,6 +73,8 @@ public class JobMaster implements Runnable {
 
     private JobImmutableInformation jobImmutableInformation;
 
+    private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
@@ -76,8 +82,8 @@ public class JobMaster implements Runnable {
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
         flakeIdGenerator =
-                this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
-
+            this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+        this.ownedSlotProfiles = new ConcurrentHashMap<>();
         this.resourceManager = resourceManager;
     }
 
@@ -127,9 +133,7 @@ public class JobMaster implements Runnable {
                 }
                 jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
             });
-
-            JobScheduler jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
-            jobScheduler.startScheduling();
+            ownedSlotProfiles.putAll(new PipelineBaseScheduler(physicalPlan, this).startScheduling());
         } catch (Throwable e) {
             LOGGER.severe(String.format("Job %s (%s) run error with: %s",
                 physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
@@ -146,6 +150,18 @@ public class JobMaster implements Runnable {
         // TODO Add some job clean operation
     }
 
+    public Address queryTaskGroupAddress(long taskGroupId) {
+        for (Integer pipelineIndex : ownedSlotProfiles.keySet()) {
+            Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineIndex).keySet().stream()
+                .filter(physicalVertex -> physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+                .findFirst();
+            if (currentVertex.isPresent()) {
+                return ownedSlotProfiles.get(pipelineIndex).get(currentVertex.get()).getWorker();
+            }
+        }
+        throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
+    }
+
     public void cancelJob() {
         this.physicalPlan.cancelJob();
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index bbe003e39..25aece4a4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.engine.server.scheduler;
 
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import java.util.Map;
+
 public interface JobScheduler {
-    void startScheduling();
+    Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling();
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 6c561a709..22a6a0f16 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -41,8 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class PipelineBaseScheduler implements JobScheduler {
@@ -61,7 +61,8 @@ public class PipelineBaseScheduler implements JobScheduler {
     }
 
     @Override
-    public void startScheduling() {
+    public Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling() {
+        Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles = new ConcurrentHashMap<>();
         if (physicalPlan.turnToRunning()) {
             List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
                 if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
@@ -71,10 +72,13 @@ public class PipelineBaseScheduler implements JobScheduler {
                 Map<PhysicalVertex, SlotProfile> slotProfiles;
                 try {
                     slotProfiles = applyResourceForPipeline(pipeline);
+                    ownedSlotProfiles.put(pipeline.getPipelineIndex(), slotProfiles);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
-                pipeline.whenComplete((state, error) -> releasePipelineResource(Lists.newArrayList(slotProfiles.values())));
+                pipeline.whenComplete((state, error) -> {
+                    releasePipelineResource(Lists.newArrayList(slotProfiles.values()));
+                });
                 // deploy pipeline
                 return CompletableFuture.supplyAsync(() -> {
                     // TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
@@ -95,6 +99,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
                 physicalPlan.getJobStatus()));
         }
+        return ownedSlotProfiles;
     }
 
     private void releasePipelineResource(List<SlotProfile> slotProfiles) {
@@ -108,19 +113,11 @@ public class PipelineBaseScheduler implements JobScheduler {
         try {
             Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
             Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
-                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-            });
+            // TODO If there is no enough resources for tasks, we need add some wait profile
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+
+            subPlan.getPhysicalVertexList().forEach(task -> futures.put(task, applyResourceForTask(task)));
 
-            subPlan.getPhysicalVertexList().forEach(task -> {
-                // TODO If there is no enough resources for tasks, we need add some wait profile
-                if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
-                    // TODO custom resource size
-                    futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
-                } else {
-                    handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
-                }
-            });
             for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
                 try {
                     slotProfiles.put(future.getKey(), future.getValue().get());
@@ -136,10 +133,23 @@ public class PipelineBaseScheduler implements JobScheduler {
         }
     }
 
-    private CompletableFuture<Void> deployTask(PhysicalVertex task, Supplier<Void> deployMethod) {
+    private CompletableFuture<SlotProfile> applyResourceForTask(PhysicalVertex task) {
+        if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+            // TODO custom resource size
+            return resourceManager.applyResource(jobId, new ResourceProfile());
+        } else {
+            handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+            return null;
+        }
+    }
+
+    private CompletableFuture<Void> deployTask(PhysicalVertex task, SlotProfile slotProfile) {
         if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
             // deploy is a time-consuming operation, so we do it async
-            return CompletableFuture.supplyAsync(deployMethod);
+            return CompletableFuture.supplyAsync(() -> {
+                task.deploy(slotProfile);
+                return null;
+            });
         } else {
             handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
         }
@@ -148,19 +158,16 @@ public class PipelineBaseScheduler implements JobScheduler {
 
     private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
         if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
-            List<CompletableFuture<?>> deployCoordinatorFuture =
-                pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, () -> {
-                    coordinator.deployOnMaster();
-                    return null;
-                })).filter(Objects::nonNull).collect(Collectors.toList());
-
-            List<CompletableFuture<?>> deployTaskFuture =
-                pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, () -> {
-                    task.deploy(slotProfiles.get(task));
-                    return null;
-                })).filter(Objects::nonNull).collect(Collectors.toList());
 
             try {
+                List<CompletableFuture<?>> deployCoordinatorFuture =
+                    pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+                        .filter(Objects::nonNull).collect(Collectors.toList());
+
+                List<CompletableFuture<?>> deployTaskFuture =
+                    pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, slotProfiles.get(task)))
+                        .filter(Objects::nonNull).collect(Collectors.toList());
+
                 deployCoordinatorFuture.addAll(deployTaskFuture);
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
                     deployCoordinatorFuture.toArray(new CompletableFuture[0]));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 3ab25429f..90a0011dd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.server.task.Progress;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
@@ -65,6 +66,9 @@ public class TaskDataSerializerHook implements DataSerializerHook {
 
     public static final int CANCEL_TASK_OPERATOR = 13;
 
+    public static final int GET_TASKGROUP_ADDRESS_TYPE = 14;
+
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -111,6 +115,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                     return new DeployTaskOperation();
                 case CANCEL_TASK_OPERATOR:
                     return new CancelTaskOperation();
+                case GET_TASKGROUP_ADDRESS_TYPE:
+                    return new GetTaskGroupAddressOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 5c60d25cf..691cbb815 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -134,8 +134,11 @@ public class DefaultSlotService implements SlotService {
         return new SlotAndWorkerProfile(toWorkerProfile(), profile);
     }
 
-    public SlotContext getSlotContext(int slotID) {
-        return contexts.get(slotID);
+    public SlotContext getSlotContext(SlotProfile slotProfile) {
+        if (!contexts.containsKey(slotProfile.getSlotID())) {
+            throw new WrongTargetSlotException("Unknown slot in slot service, slot profile: " + slotProfile);
+        }
+        return contexts.get(slotProfile.getSlotID());
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
index d975de191..8ec29605e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
@@ -26,7 +26,6 @@ public class SlotContext {
     public SlotContext(int slotID, TaskExecutionService taskExecutionService) {
         this.slotID = slotID;
         this.taskExecutionService = taskExecutionService;
-        this.taskExecutionService.setSlotContext(this);
     }
 
     public int getSlotID() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index 5d5a5ab14..d39276b26 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -28,7 +28,7 @@ public interface SlotService {
 
     SlotAndWorkerProfile requestSlot(long jobID, ResourceProfile resourceProfile);
 
-    SlotContext getSlotContext(int slotID);
+    SlotContext getSlotContext(SlotProfile slotProfile);
 
     void releaseSlot(long jobId, SlotProfile slotProfile);
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 939033c3f..7c93f4113 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -27,15 +27,19 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
 import org.apache.seatunnel.engine.server.task.record.ClosedSign;
 
+import com.hazelcast.cluster.Address;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
 
@@ -53,18 +57,20 @@ public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implemen
 
     private final TaskLocation taskLocation;
 
-    private final TaskLocation committerTaskID;
+    private Address committerTaskAddress;
+
+    private final TaskLocation committerTaskLocation;
 
     private final boolean containCommitter;
 
     public SinkFlowLifeCycle(SinkAction<T, StateT, ?, ?> sinkAction, TaskLocation taskLocation, int indexID,
-                             SeaTunnelTask runningTask, TaskLocation committerTaskID,
+                             SeaTunnelTask runningTask, TaskLocation committerTaskLocation,
                              boolean containCommitter, CompletableFuture<Void> completableFuture) {
         super(runningTask, completableFuture);
         this.sinkAction = sinkAction;
         this.indexID = indexID;
         this.taskLocation = taskLocation;
-        this.committerTaskID = committerTaskID;
+        this.committerTaskLocation = committerTaskLocation;
         this.containCommitter = containCommitter;
     }
 
@@ -78,23 +84,31 @@ public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implemen
         this.writerStateSerializer = sinkAction.getSink().getWriterStateSerializer();
         this.protoStuffSerializer = new ProtoStuffSerializer();
         states = null;
+        if (containCommitter) {
+            committerTaskAddress = getCommitterTaskAddress();
+        }
         registerCommitter();
     }
 
+    private Address getCommitterTaskAddress() throws ExecutionException, InterruptedException {
+        return (Address) runningTask.getExecutionContext()
+            .sendToMaster(new GetTaskGroupAddressOperation(committerTaskLocation)).get();
+    }
+
     @Override
     public void close() throws IOException {
         super.close();
         writer.close();
         if (containCommitter) {
-            runningTask.getExecutionContext().sendToMaster(new SinkUnregisterOperation(taskLocation,
-                    committerTaskID)).join();
+            runningTask.getExecutionContext().sendToMember(new SinkUnregisterOperation(taskLocation,
+                committerTaskLocation), committerTaskAddress).join();
         }
     }
 
     private void registerCommitter() {
         if (containCommitter) {
-            runningTask.getExecutionContext().sendToMaster(new SinkRegisterOperation(taskLocation,
-                    committerTaskID)).join();
+            runningTask.getExecutionContext().sendToMember(new SinkRegisterOperation(taskLocation,
+                committerTaskLocation), committerTaskAddress).join();
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index d3b5b0218..c2bebbab2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -29,10 +29,12 @@ import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SourceReaderContext;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
 
+import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 
@@ -47,7 +49,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
     private static final ILogger LOGGER = Logger.getLogger(SourceFlowLifeCycle.class);
 
     private final SourceAction<T, SplitT, ?> sourceAction;
-    private final TaskLocation enumeratorTaskID;
+    private final TaskLocation enumeratorTaskLocation;
+
+    private Address enumeratorTaskAddress;
 
     private SourceReader<T, SplitT> reader;
 
@@ -56,20 +60,20 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
     private transient org.apache.seatunnel.engine.checkpoint.storage.common.Serializer protoStuffSerializer;
     private final int indexID;
 
-    private final TaskLocation currentTaskID;
+    private final TaskLocation currentTaskLocation;
 
     private SeaTunnelSourceCollector<T> collector;
 
     private volatile boolean closed;
 
     public SourceFlowLifeCycle(SourceAction<T, SplitT, ?> sourceAction, int indexID,
-                               TaskLocation enumeratorTaskID, SeaTunnelTask runningTask,
-                               TaskLocation currentTaskID, CompletableFuture<Void> completableFuture) {
+                               TaskLocation enumeratorTaskLocation, SeaTunnelTask runningTask,
+                               TaskLocation currentTaskLocation, CompletableFuture<Void> completableFuture) {
         super(runningTask, completableFuture);
         this.sourceAction = sourceAction;
         this.indexID = indexID;
-        this.enumeratorTaskID = enumeratorTaskID;
-        this.currentTaskID = currentTaskID;
+        this.enumeratorTaskLocation = enumeratorTaskLocation;
+        this.currentTaskLocation = currentTaskLocation;
     }
 
     public void setCollector(SeaTunnelSourceCollector<T> collector) {
@@ -82,11 +86,17 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
         this.splitSerializer = sourceAction.getSource().getSplitSerializer();
         this.protoStuffSerializer = new ProtoStuffSerializer();
         reader = sourceAction.getSource()
-                .createReader(new SourceReaderContext(indexID, sourceAction.getSource().getBoundedness(), this));
+            .createReader(new SourceReaderContext(indexID, sourceAction.getSource().getBoundedness(), this));
         reader.open();
+        enumeratorTaskAddress = getEnumeratorTaskAddress();
         register();
     }
 
+    private Address getEnumeratorTaskAddress() throws ExecutionException, InterruptedException {
+        return (Address) runningTask.getExecutionContext()
+            .sendToMaster(new GetTaskGroupAddressOperation(enumeratorTaskLocation)).get();
+    }
+
     @Override
     public void close() throws IOException {
         collector.close();
@@ -104,8 +114,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
         // ready close this reader
         try {
             this.closed = true;
-            runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
-                    enumeratorTaskID)).get();
+            runningTask.getExecutionContext().sendToMember(new SourceNoMoreElementOperation(currentTaskLocation,
+                enumeratorTaskLocation), enumeratorTaskAddress).get();
         } catch (Exception e) {
             LOGGER.warning("source close failed ", e);
             throw new RuntimeException(e);
@@ -114,8 +124,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
 
     private void register() {
         try {
-            runningTask.getExecutionContext().sendToMaster(new SourceRegisterOperation(currentTaskID,
-                    enumeratorTaskID)).get();
+            runningTask.getExecutionContext().sendToMember(new SourceRegisterOperation(currentTaskLocation,
+                enumeratorTaskLocation), enumeratorTaskAddress).get();
         } catch (InterruptedException | ExecutionException e) {
             LOGGER.warning("source register failed ", e);
             throw new RuntimeException(e);
@@ -124,8 +134,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
 
     public void requestSplit() {
         try {
-            runningTask.getExecutionContext().sendToMaster(new RequestSplitOperation(currentTaskID,
-                    enumeratorTaskID)).get();
+            runningTask.getExecutionContext().sendToMember(new RequestSplitOperation(currentTaskLocation,
+                enumeratorTaskLocation), enumeratorTaskAddress).get();
         } catch (InterruptedException | ExecutionException e) {
             LOGGER.warning("source request split failed", e);
             throw new RuntimeException(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 155307a6e..a67a43456 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -46,8 +46,8 @@ public class DeployTaskOperation extends AsyncOperation {
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer server = getService();
-        return server.getSlotService().getSlotContext(slotProfile.getSlotID())
-                .getTaskExecutionService().deployTask(taskImmutableInformation);
+        return server.getSlotService().getSlotContext(slotProfile)
+            .getTaskExecutionService().deployTask(taskImmutableInformation);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
similarity index 50%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 155307a6e..df6544140 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -17,55 +17,71 @@
 
 package org.apache.seatunnel.engine.server.task.operation;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.operation.AsyncOperation;
-import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.cluster.Address;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class DeployTaskOperation extends AsyncOperation {
-    private Data taskImmutableInformation;
-    private SlotProfile slotProfile;
+public class GetTaskGroupAddressOperation extends Operation implements IdentifiedDataSerializable {
 
-    public DeployTaskOperation() {
+    private TaskLocation taskLocation;
+
+    private Address response;
+
+    public GetTaskGroupAddressOperation() {
     }
 
-    public DeployTaskOperation(@NonNull SlotProfile slotProfile, @NonNull Data taskImmutableInformation) {
-        this.taskImmutableInformation = taskImmutableInformation;
-        this.slotProfile = slotProfile;
+    public GetTaskGroupAddressOperation(TaskLocation taskLocation) {
+        this.taskLocation = taskLocation;
     }
 
     @Override
-    protected PassiveCompletableFuture<?> doRun() throws Exception {
+    public void run() throws Exception {
         SeaTunnelServer server = getService();
-        return server.getSlotService().getSlotContext(slotProfile.getSlotID())
-                .getTaskExecutionService().deployTask(taskImmutableInformation);
+        response = RetryUtils.retryWithException(() -> server.getJobMaster(taskLocation.getJobId())
+                .queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
+            new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+                exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
-    public int getClassId() {
-        return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
+    public Object getResponse() {
+        return response;
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        IOUtil.writeData(out, taskImmutableInformation);
-        out.writeObject(slotProfile);
+        taskLocation.writeData(out);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        taskImmutableInformation = IOUtil.readData(in);
-        slotProfile = in.readObject();
+        taskLocation.readData(in);
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
     }
 }