You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/30 04:16:39 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new ab4561069 [Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)
ab4561069 is described below

commit ab4561069ce963ac0d466af6dcf7edef2c2e2ced
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Aug 30 12:16:34 2022 +0800

    [Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)
    
    * add TaskGroupInfo
    
    * add TaskGroupInfo
    
    * add TaskGroupInfo
    
    * fix checkstyle
    
    * rename TaskGroupInfo to TaskGroupLocation
    
    * resolved conflicts
---
 .../engine/server/TaskExecutionService.java        | 44 +++++++++--------
 .../server/dag/physical/PhysicalPlanGenerator.java | 29 +++++++-----
 .../engine/server/dag/physical/PhysicalVertex.java | 22 +++++----
 .../server/execution/TaskExecutionState.java       | 10 ++--
 .../engine/server/execution/TaskGroup.java         |  2 +-
 .../server/execution/TaskGroupDefaultImpl.java     | 10 ++--
 .../engine/server/execution/TaskGroupLocation.java | 55 ++++++++++++++++++++++
 .../engine/server/execution/TaskLocation.java      | 31 ++++++------
 .../server/resourcemanager/ResourceManager.java    |  6 ++-
 .../resourcemanager/SimpleResourceManager.java     | 19 ++++----
 .../server/scheduler/PipelineBaseScheduler.java    |  4 +-
 .../task/group/TaskGroupWithIntermediateQueue.java |  5 +-
 .../server/task/operation/CancelTaskOperation.java | 13 ++---
 .../task/operation/sink/SinkRegisterOperation.java |  2 +-
 .../operation/sink/SinkUnregisterOperation.java    |  2 +-
 .../operation/source/AssignSplitOperation.java     |  2 +-
 .../operation/source/CloseRequestOperation.java    |  2 +-
 .../operation/source/RequestSplitOperation.java    |  2 +-
 .../source/SourceNoMoreElementOperation.java       |  2 +-
 .../operation/source/SourceRegisterOperation.java  |  2 +-
 .../engine/server/TaskExecutionServiceTest.java    | 21 +++++----
 21 files changed, 182 insertions(+), 103 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 1bd2b4b9e..f739fb192 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
@@ -78,8 +79,8 @@ public class TaskExecutionService {
     private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
     private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
-    private final ConcurrentMap<Long, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
-    private final ConcurrentMap<Long, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -96,8 +97,8 @@ public class TaskExecutionService {
         executorService.shutdownNow();
     }
 
-    public TaskGroupContext getExecutionContext(long taskGroupId) {
-        return executionContexts.get(taskGroupId);
+    public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
+        return executionContexts.get(taskGroupLocation);
     }
 
     private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
@@ -122,7 +123,7 @@ public class TaskExecutionService {
         uncheckRun(startedLatch::await);
     }
 
-    public PassiveCompletableFuture<TaskExecutionState> deployTask(
+    public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
         @NonNull Data taskImmutableInformation
     ) {
         CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
@@ -140,13 +141,16 @@ public class TaskExecutionService {
             } else {
                 taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
             }
+            if (executionContexts.containsKey(taskGroup.getTaskGroupInfo())) {
+                throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupInfo()));
+            }
             return deployLocalTask(taskGroup, resultFuture);
         } catch (Throwable t) {
             logger.severe(String.format("TaskGroupID : %s  deploy error with Exception: %s",
-                taskGroup != null ? taskGroup.getId() : -1,
+                taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo().toString() : "taskGroupInfo is null",
                 ExceptionUtils.getMessage(t)));
             resultFuture.complete(
-                new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t));
+                new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo() : null, ExecutionState.FAILED, t));
         }
         return new PassiveCompletableFuture<>(resultFuture);
     }
@@ -173,8 +177,8 @@ public class TaskExecutionService {
             submitThreadShareTask(executionTracker, byCooperation.get(true));
             submitBlockingTask(executionTracker, byCooperation.get(false));
             taskGroup.setTasksContext(taskExecutionContextMap);
-            executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
-            cancellationFutures.put(taskGroup.getId(), cancellationFuture);
+            executionContexts.put(taskGroup.getTaskGroupInfo(), new TaskGroupContext(taskGroup));
+            cancellationFutures.put(taskGroup.getTaskGroupInfo(), cancellationFuture);
         } catch (Throwable t) {
             logger.severe(ExceptionUtils.getMessage(t));
             resultFuture.completeExceptionally(t);
@@ -186,14 +190,14 @@ public class TaskExecutionService {
      * JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
      * {@link TaskExecutionState} to JobMaster.
      *
-     * @param taskId TaskGroup.getId()
+     * @param taskGroupLocation TaskGroup.getTaskGroupInfo()
      */
-    public void cancelTaskGroup(long taskId) {
-        logger.info(String.format("Task (%s) need cancel.", taskId));
-        if (cancellationFutures.containsKey(taskId)) {
-            cancellationFutures.get(taskId).cancel(false);
+    public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
+        logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
+        if (cancellationFutures.containsKey(taskGroupLocation)) {
+            cancellationFutures.get(taskGroupLocation).cancel(false);
         } else {
-            logger.warning(String.format("need cancel taskId : %s is not exist", taskId));
+            logger.warning(String.format("need cancel taskId : %s is not exist", taskGroupLocation));
         }
 
     }
@@ -379,14 +383,16 @@ public class TaskExecutionService {
 
         void taskDone() {
             if (completionLatch.decrementAndGet() == 0) {
-                executionContexts.remove(taskGroup.getId());
+                TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupInfo();
+                executionContexts.remove(taskGroupLocation);
+                cancellationFutures.remove(taskGroupLocation);
                 Throwable ex = executionException.get();
                 if (ex == null) {
-                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
+                    future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED, null));
                 } else if (isCancel.get()) {
-                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, null));
+                    future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null));
                 } else {
-                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
+                    future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
                 }
             }
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 18a187913..f046f65f6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -42,6 +42,7 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
 import org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
@@ -178,17 +179,18 @@ public class PhysicalPlanGenerator {
                 // if sinkAggregatedCommitter is empty, don't create task.
                 if (sinkAggregatedCommitter.isPresent()) {
                     long taskGroupID = idGenerator.getNextId();
+                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                     SinkAggregatedCommitterTask<?> t =
                         new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
-                            new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
+                            new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
                             sinkAggregatedCommitter.get());
-                    committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
+                    committerTaskIDMap.put(s, new TaskLocation(taskGroupLocation, t.getTaskID()));
 
                     return new PhysicalVertex(idGenerator.getNextId(),
                         atomicInteger.incrementAndGet(),
                         executorService,
                         collect.size(),
-                        new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-AggregatedCommitterTask",
+                        new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-AggregatedCommitterTask",
                             Lists.newArrayList(t)),
                         flakeIdGenerator,
                         pipelineIndex,
@@ -215,15 +217,16 @@ public class PhysicalPlanGenerator {
                 long taskGroupIDPrefix = idGenerator.getNextId();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                     setFlowConfig(flow, i);
                     SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                        new TaskLocation(taskGroupID, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
+                        new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
 
                     t.add(new PhysicalVertex(idGenerator.getNextId(),
                         i,
                         executorService,
                         flow.getAction().getParallelism(),
-                        new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+                        new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() +
                             "-PartitionTransformTask",
                             Lists.newArrayList(seaTunnelTask)),
                         flakeIdGenerator,
@@ -245,15 +248,16 @@ public class PhysicalPlanGenerator {
 
         return sources.stream().map(s -> {
             long taskGroupID = idGenerator.getNextId();
+            TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
             SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
-                new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
-            enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
+                new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
+            enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupLocation, t.getTaskID()));
 
             return new PhysicalVertex(idGenerator.getNextId(),
                 atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
-                new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-SplitEnumerator",
+                new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-SplitEnumerator",
                     Lists.newArrayList(t)),
                 flakeIdGenerator,
                 pipelineIndex,
@@ -282,6 +286,7 @@ public class PhysicalPlanGenerator {
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     int finalParallelismIndex = i;
                     long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                     List<SeaTunnelTask> taskList =
                         flows.stream().map(f -> {
                             setFlowConfig(f, finalParallelismIndex);
@@ -289,12 +294,12 @@ public class PhysicalPlanGenerator {
                                 flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
                             if (f instanceof PhysicalExecutionFlow) {
                                 return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
-                                    new TaskLocation(taskGroupID,
+                                    new TaskLocation(taskGroupLocation,
                                         mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
                                     finalParallelismIndex, f);
                             } else {
                                 return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                                    new TaskLocation(taskGroupID,
+                                    new TaskLocation(taskGroupLocation,
                                         mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
                                     finalParallelismIndex, f);
                             }
@@ -308,7 +313,7 @@ public class PhysicalPlanGenerator {
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
-                            new TaskGroupWithIntermediateQueue(taskGroupID, flow.getAction().getName() +
+                            new TaskGroupWithIntermediateQueue(taskGroupLocation, flow.getAction().getName() +
                                 "-SourceTask",
                                 taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
                             flakeIdGenerator,
@@ -323,7 +328,7 @@ public class PhysicalPlanGenerator {
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
-                            new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+                            new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() +
                                 "-SourceTask",
                                 taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
                             flakeIdGenerator,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d1fe0844e..15e7f843c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
@@ -54,6 +55,8 @@ public class PhysicalVertex {
 
     private final long physicalVertexId;
 
+    private final TaskGroupLocation taskGroupLocation;
+
     /**
      * the index of PhysicalVertex
      */
@@ -145,6 +148,7 @@ public class PhysicalVertex {
                 subTaskGroupIndex + 1,
                 parallelism);
         this.taskFuture = new CompletableFuture<>();
+        this.taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, physicalVertexId);
     }
 
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
@@ -176,20 +180,20 @@ public class PhysicalVertex {
                     if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
                         turnToEndState(ExecutionState.CANCELED);
                         taskFuture.complete(
-                            new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+                            new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
                     } else {
                         turnToEndState(ExecutionState.FAILED);
-                        taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED,
+                        taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
                             new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
                                 this.getTaskFullName(), executionState.get()))));
                     }
                 }
             } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
                 turnToEndState(ExecutionState.CANCELED);
-                taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(), null));
+                taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
             } else {
                 turnToEndState(ExecutionState.FAILED);
-                taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(),
+                taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
                     new JobException(String.format("%s turn to a unexpected state"))));
             }
 
@@ -199,7 +203,7 @@ public class PhysicalVertex {
                 ExceptionUtils.getMessage(th)));
             turnToEndState(ExecutionState.FAILED);
             taskFuture.complete(
-                new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED, th));
+                new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
         }
 
         if (waitForCompleteByExecutionService == null) {
@@ -211,7 +215,7 @@ public class PhysicalVertex {
                 if (t != null) {
                     LOGGER.severe("An unexpected error occurred while the task was running", t);
                     taskFuture.complete(
-                        new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED,
+                        new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
                             t));
                 } else {
                     turnToEndState(v.getExecutionState());
@@ -231,7 +235,7 @@ public class PhysicalVertex {
                 LOGGER.severe(
                     String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
                 turnToEndState(ExecutionState.FAILED);
-                v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th);
+                v = new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th);
                 taskFuture.complete(v);
             }
         });
@@ -309,7 +313,7 @@ public class PhysicalVertex {
     public void cancel() {
         if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
             updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
-            taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
         } else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
             // do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
         } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
@@ -325,7 +329,7 @@ public class PhysicalVertex {
             try {
                 i++;
                 nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                        new CancelTaskOperation(taskGroup.getId()),
+                        new CancelTaskOperation(taskGroup.getTaskGroupInfo()),
                         currentExecutionAddress)
                     .invoke().get();
                 return;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index 72ed4526e..e31fdbcec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -21,14 +21,14 @@ import java.io.Serializable;
 
 public class TaskExecutionState implements Serializable {
 
-    private final long taskExecutionId;
+    private final TaskGroupLocation taskGroupLocation;
 
     private final ExecutionState executionState;
 
     private Throwable throwable;
 
-    public TaskExecutionState(long taskExecutionId, ExecutionState executionState, Throwable throwable) {
-        this.taskExecutionId = taskExecutionId;
+    public TaskExecutionState(TaskGroupLocation taskGroupLocation, ExecutionState executionState, Throwable throwable) {
+        this.taskGroupLocation = taskGroupLocation;
         this.executionState = executionState;
         this.throwable = throwable;
     }
@@ -41,7 +41,7 @@ public class TaskExecutionState implements Serializable {
         return throwable;
     }
 
-    public long getTaskExecutionId() {
-        return taskExecutionId;
+    public TaskGroupLocation getTaskGroupInfo() {
+        return taskGroupLocation;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index ec14bdeb0..cee3eeaf3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public interface TaskGroup extends Serializable {
 
-    long getId();
+    TaskGroupLocation getTaskGroupInfo();
 
     void init();
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 76b95ea26..d0d62364e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -23,14 +23,14 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TaskGroupDefaultImpl implements TaskGroup {
-    private final long id;
+    private final TaskGroupLocation taskGroupLocation;
 
     private final String taskGroupName;
 
     private final Map<Long, Task> tasks;
 
-    public TaskGroupDefaultImpl(long id, String taskGroupName, Collection<Task> tasks) {
-        this.id = id;
+    public TaskGroupDefaultImpl(TaskGroupLocation taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+        this.taskGroupLocation = taskGroupLocation;
         this.taskGroupName = taskGroupName;
         this.tasks = tasks.stream().collect(Collectors.toMap(Task::getTaskID, Function.identity()));
     }
@@ -40,8 +40,8 @@ public class TaskGroupDefaultImpl implements TaskGroup {
     }
 
     @Override
-    public long getId() {
-        return id;
+    public TaskGroupLocation getTaskGroupInfo() {
+        return taskGroupLocation;
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
new file mode 100644
index 000000000..71c63319d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class TaskGroupLocation implements Serializable {
+    private final long jobId;
+
+    private final long pipelineId;
+
+    private final long taskGroupId;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TaskGroupLocation that = (TaskGroupLocation) o;
+
+        return new EqualsBuilder().append(jobId, that.jobId).append(pipelineId, that.pipelineId).append(taskGroupId, that.taskGroupId).isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37).append(jobId).append(pipelineId).append(taskGroupId).toHashCode();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index d361037bd..cd126b19e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -22,30 +22,31 @@ import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Objects;
 
 public class TaskLocation implements IdentifiedDataSerializable, Serializable {
 
-    private long taskGroupID;
+    private TaskGroupLocation taskGroupLocation;
     private long taskID;
 
     public TaskLocation() {
     }
 
-    public TaskLocation(long taskGroupID, long taskID) {
-        this.taskGroupID = taskGroupID;
+    public TaskLocation(TaskGroupLocation taskGroupLocation, long taskID) {
+        this.taskGroupLocation = taskGroupLocation;
         this.taskID = taskID;
     }
 
-    public long getTaskGroupID() {
-        return taskGroupID;
+    public TaskGroupLocation getTaskGroupLocation() {
+        return taskGroupLocation;
     }
 
-    public void setTaskGroupID(long taskGroupID) {
-        this.taskGroupID = taskGroupID;
+    public void setTaskGroupLocation(TaskGroupLocation taskGroupLocation) {
+        this.taskGroupLocation = taskGroupLocation;
     }
 
     public long getTaskID() {
@@ -68,22 +69,22 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
-        out.writeLong(taskGroupID);
+        out.writeObject(taskGroupLocation);
         out.writeLong(taskID);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        taskGroupID = in.readLong();
+        taskGroupLocation = in.readObject();
         taskID = in.readLong();
     }
 
     @Override
     public String toString() {
         return "TaskLocation{" +
-                "taskGroupID=" + taskGroupID +
-                ", taskID=" + taskID +
-                '}';
+            "taskGroupLocation=" + taskGroupLocation +
+            ", taskID=" + taskID +
+            '}';
     }
 
     @Override
@@ -95,11 +96,11 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
             return false;
         }
         TaskLocation that = (TaskLocation) o;
-        return taskGroupID == that.taskGroupID && taskID == that.taskID;
+        return new EqualsBuilder().append(taskID, that.taskID).append(taskGroupLocation, that.taskGroupLocation).isEquals();
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskGroupID, taskID);
+        return new HashCodeBuilder(17, 37).append(taskGroupLocation).append(taskID).toHashCode();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index f849dc9a7..cb459944c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,12 +17,14 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
 import com.hazelcast.cluster.Address;
 import lombok.NonNull;
 
 public interface ResourceManager {
-    Address applyForResource(long jobId, long taskId);
+    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
 
     @NonNull
-    Address getAppliedResource(long jobId, long taskId);
+    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
index 59bf5b204..28bbb81d1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.resourcemanager;
 
 import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.spi.impl.NodeEngine;
@@ -31,7 +32,7 @@ import java.util.Map;
 public class SimpleResourceManager implements ResourceManager {
 
     // TODO We may need more detailed resource define, instead of the resource definition method of only Address.
-    private Map<Long, Map<Long, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
+    private Map<Long, Map<TaskGroupLocation, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
 
     private final NodeEngine nodeEngine;
 
@@ -41,14 +42,14 @@ public class SimpleResourceManager implements ResourceManager {
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
-    public Address applyForResource(long jobId, long taskId) {
-        Map<Long, Address> jobAddressMap =
+    public Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation) {
+        Map<TaskGroupLocation, Address> jobAddressMap =
             physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
 
         Address localhost =
-            jobAddressMap.putIfAbsent(taskId, nodeEngine.getThisAddress());
+            jobAddressMap.putIfAbsent(taskGroupLocation, nodeEngine.getThisAddress());
         if (null == localhost) {
-            localhost = jobAddressMap.get(taskId);
+            localhost = jobAddressMap.get(taskGroupLocation);
         }
 
         return localhost;
@@ -57,13 +58,13 @@ public class SimpleResourceManager implements ResourceManager {
 
     @Override
     @NonNull
-    public Address getAppliedResource(long jobId, long taskId) {
-        Map<Long, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+    public Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation) {
+        Map<TaskGroupLocation, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
         if (null == longAddressMap || longAddressMap.isEmpty()) {
             throw new JobException(
-                String.format("Job %s, Task %s can not found applied resource.", jobId, taskId));
+                String.format("Job %s, Task %s can not found applied resource.", jobId, taskGroupLocation));
         }
 
-        return longAddressMap.get(taskId);
+        return longAddressMap.get(taskGroupLocation);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 8d42737d2..797de9365 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -100,7 +100,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             // TODO If there is no enough resources for tasks, we need add some wait profile
             if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
                 resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getId());
+                    task.getTaskGroup().getTaskGroupInfo());
             } else {
                 handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
             }
@@ -115,7 +115,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             return CompletableFuture.supplyAsync(() -> {
                 task.deploy(
                     resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                        task.getTaskGroup().getId()));
+                        task.getTaskGroup().getTaskGroupInfo()));
                 return null;
             });
         } else {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
index 08d5de1c1..682b11640 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.group;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
 import java.util.Collection;
@@ -32,8 +33,8 @@ public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
 
     public static final int QUEUE_SIZE = 1000;
 
-    public TaskGroupWithIntermediateQueue(long id, String taskGroupName, Collection<Task> tasks) {
-        super(id, taskGroupName, tasks);
+    public TaskGroupWithIntermediateQueue(TaskGroupLocation taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+        super(taskGroupLocation, taskGroupName, tasks);
     }
 
     private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache = null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
index 37f45c17b..7c75f0960 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -33,13 +34,13 @@ import java.io.IOException;
  * notified JobMaster
  */
 public class CancelTaskOperation extends Operation implements IdentifiedDataSerializable {
-    private long taskGroupId;
+    private TaskGroupLocation taskGroupLocation;
 
     public CancelTaskOperation() {
     }
 
-    public CancelTaskOperation(long taskGroupId) {
-        this.taskGroupId = taskGroupId;
+    public CancelTaskOperation(TaskGroupLocation taskGroupLocation) {
+        this.taskGroupLocation = taskGroupLocation;
     }
 
     @Override
@@ -55,7 +56,7 @@ public class CancelTaskOperation extends Operation implements IdentifiedDataSeri
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        server.getTaskExecutionService().cancelTaskGroup(taskGroupId);
+        server.getTaskExecutionService().cancelTaskGroup(taskGroupLocation);
     }
 
     @Override
@@ -66,12 +67,12 @@ public class CancelTaskOperation extends Operation implements IdentifiedDataSeri
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        out.writeLong(taskGroupId);
+        out.writeObject(taskGroupLocation);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        taskGroupId = in.readLong();
+        taskGroupLocation = in.readObject();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index 70f0365de..ccbc8d519 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -56,7 +56,7 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
         SinkAggregatedCommitterTask<?> task = null;
         for (int i = 0; i < RETRY_TIME; i++) {
             try {
-                task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupID())
+                task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation())
                         .getTaskGroup().getTask(committerTaskID.getTaskID());
                 break;
             } catch (NullPointerException e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
index cfea0b586..558593551 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
@@ -46,7 +46,7 @@ public class SinkUnregisterOperation extends Operation implements IdentifiedData
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupID()).getTaskGroup().getTask(committerTaskID.getTaskID());
+                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
         task.receivedWriterUnregister(currentTaskID);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 8fba46f6f..ddbe9ae59 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -51,7 +51,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             SourceSeaTunnelTask<?, SplitT> task =
-                server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup()
+                server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getTaskGroup()
                     .getTask(taskID.getTaskID());
             task.receivedSourceSplit(splits);
             return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index 18537e1f4..f25b895b3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -47,7 +47,7 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             SourceSeaTunnelTask<?, ?> task =
-                server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
+                server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupLocation())
                     .getTaskGroup().getTask(readerLocation.getTaskID());
             task.close();
             return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 1d6edefd5..d214368a1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,7 +51,7 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
 
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
+                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
                     .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
             task.requestSplit(taskID.getTaskID());
             return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 626ff1911..58b09146e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,7 +49,7 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
+                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
                     .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
             task.readerFinished(currentTaskID.getTaskID());
             return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 1f89f1f9d..a2f08bf70 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -57,7 +57,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
         Address readerAddress = getCallerAddress();
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup()
+                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation()).getTaskGroup()
                     .getTask(enumeratorTaskID.getTaskID());
             task.receivedReader(readerTaskID, readerAddress);
             return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 26c556f1d..d6d371e49 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TestTask;
 
 import com.google.common.collect.Lists;
@@ -50,6 +51,8 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
 
     FlakeIdGenerator flakeIdGenerator;
     long taskRunTime = 2000;
+    long jobId = 10001;
+    long pipeLineId = 100001;
 
     @Before
     @Override
@@ -87,10 +90,10 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
         TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        long taskGroupId = flakeIdGenerator.newId();
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(taskGroupId, "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+        TaskGroupDefaultImpl ts = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2));
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(ts, new CompletableFuture<>());
 
-        taskExecutionService.cancelTaskGroup(taskGroupId);
+        taskExecutionService.cancelTaskGroup(ts.getTaskGroupInfo());
 
         await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));
@@ -106,7 +109,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
         TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+        final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
         completableFuture.whenComplete((unused, throwable) -> futureMark.set(true));
         stop.set(true);
 
@@ -133,7 +136,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
 
         TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
-        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
 
         // Run it for a while
         Thread.sleep(taskRunTime);
@@ -178,11 +181,11 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         tasks.addAll(lowLagTask);
         Collections.shuffle(tasks);
 
-        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(tasks)), new CompletableFuture<>());
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(tasks)), new CompletableFuture<>());
 
-        CompletableFuture<TaskExecutionState> t1c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(t1)), new CompletableFuture<>());
+        CompletableFuture<TaskExecutionState> t1c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t1", Lists.newArrayList(t1)), new CompletableFuture<>());
 
-        CompletableFuture<TaskExecutionState> t2c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t2", Lists.newArrayList(t2)), new CompletableFuture<>());
+        CompletableFuture<TaskExecutionState> t2c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t2", Lists.newArrayList(t2)), new CompletableFuture<>());
 
         Thread.sleep(taskRunTime);
 
@@ -221,7 +224,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         tasks.addAll(lowLagTask);
         Collections.shuffle(tasks);
 
-        TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(tasks));
+        TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(tasks));
 
         logger.info("task size is : " + taskGroup.getTasks().size());