You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/30 04:16:39 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new ab4561069 [Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)
ab4561069 is described below
commit ab4561069ce963ac0d466af6dcf7edef2c2e2ced
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Aug 30 12:16:34 2022 +0800
[Engine][Task] Use TaskGroupLocation instead of TaskGroupID (#2559)
* add TaskGroupInfo
* add TaskGroupInfo
* add TaskGroupInfo
* fix checkstyle
* rename TaskGroupInfo to TaskGroupLocation
* resolved conflicts
---
.../engine/server/TaskExecutionService.java | 44 +++++++++--------
.../server/dag/physical/PhysicalPlanGenerator.java | 29 +++++++-----
.../engine/server/dag/physical/PhysicalVertex.java | 22 +++++----
.../server/execution/TaskExecutionState.java | 10 ++--
.../engine/server/execution/TaskGroup.java | 2 +-
.../server/execution/TaskGroupDefaultImpl.java | 10 ++--
.../engine/server/execution/TaskGroupLocation.java | 55 ++++++++++++++++++++++
.../engine/server/execution/TaskLocation.java | 31 ++++++------
.../server/resourcemanager/ResourceManager.java | 6 ++-
.../resourcemanager/SimpleResourceManager.java | 19 ++++----
.../server/scheduler/PipelineBaseScheduler.java | 4 +-
.../task/group/TaskGroupWithIntermediateQueue.java | 5 +-
.../server/task/operation/CancelTaskOperation.java | 13 ++---
.../task/operation/sink/SinkRegisterOperation.java | 2 +-
.../operation/sink/SinkUnregisterOperation.java | 2 +-
.../operation/source/AssignSplitOperation.java | 2 +-
.../operation/source/CloseRequestOperation.java | 2 +-
.../operation/source/RequestSplitOperation.java | 2 +-
.../source/SourceNoMoreElementOperation.java | 2 +-
.../operation/source/SourceRegisterOperation.java | 2 +-
.../engine/server/TaskExecutionServiceTest.java | 21 +++++----
21 files changed, 182 insertions(+), 103 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 1bd2b4b9e..f739fb192 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
@@ -78,8 +79,8 @@ public class TaskExecutionService {
private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
- private final ConcurrentMap<Long, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
- private final ConcurrentMap<Long, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -96,8 +97,8 @@ public class TaskExecutionService {
executorService.shutdownNow();
}
- public TaskGroupContext getExecutionContext(long taskGroupId) {
- return executionContexts.get(taskGroupId);
+ public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
+ return executionContexts.get(taskGroupLocation);
}
private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
@@ -122,7 +123,7 @@ public class TaskExecutionService {
uncheckRun(startedLatch::await);
}
- public PassiveCompletableFuture<TaskExecutionState> deployTask(
+ public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
@NonNull Data taskImmutableInformation
) {
CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
@@ -140,13 +141,16 @@ public class TaskExecutionService {
} else {
taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
}
+ if (executionContexts.containsKey(taskGroup.getTaskGroupInfo())) {
+ throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupInfo()));
+ }
return deployLocalTask(taskGroup, resultFuture);
} catch (Throwable t) {
logger.severe(String.format("TaskGroupID : %s deploy error with Exception: %s",
- taskGroup != null ? taskGroup.getId() : -1,
+ taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo().toString() : "taskGroupInfo is null",
ExceptionUtils.getMessage(t)));
resultFuture.complete(
- new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t));
+ new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupInfo() != null ? taskGroup.getTaskGroupInfo() : null, ExecutionState.FAILED, t));
}
return new PassiveCompletableFuture<>(resultFuture);
}
@@ -173,8 +177,8 @@ public class TaskExecutionService {
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
taskGroup.setTasksContext(taskExecutionContextMap);
- executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
- cancellationFutures.put(taskGroup.getId(), cancellationFuture);
+ executionContexts.put(taskGroup.getTaskGroupInfo(), new TaskGroupContext(taskGroup));
+ cancellationFutures.put(taskGroup.getTaskGroupInfo(), cancellationFuture);
} catch (Throwable t) {
logger.severe(ExceptionUtils.getMessage(t));
resultFuture.completeExceptionally(t);
@@ -186,14 +190,14 @@ public class TaskExecutionService {
* JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
* {@link TaskExecutionState} to JobMaster.
*
- * @param taskId TaskGroup.getId()
+ * @param taskGroupLocation TaskGroup.getTaskGroupInfo()
*/
- public void cancelTaskGroup(long taskId) {
- logger.info(String.format("Task (%s) need cancel.", taskId));
- if (cancellationFutures.containsKey(taskId)) {
- cancellationFutures.get(taskId).cancel(false);
+ public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
+ logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
+ if (cancellationFutures.containsKey(taskGroupLocation)) {
+ cancellationFutures.get(taskGroupLocation).cancel(false);
} else {
- logger.warning(String.format("need cancel taskId : %s is not exist", taskId));
+ logger.warning(String.format("need cancel taskId : %s is not exist", taskGroupLocation));
}
}
@@ -379,14 +383,16 @@ public class TaskExecutionService {
void taskDone() {
if (completionLatch.decrementAndGet() == 0) {
- executionContexts.remove(taskGroup.getId());
+ TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupInfo();
+ executionContexts.remove(taskGroupLocation);
+ cancellationFutures.remove(taskGroupLocation);
Throwable ex = executionException.get();
if (ex == null) {
- future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
+ future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED, null));
} else if (isCancel.get()) {
- future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, null));
+ future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null));
} else {
- future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
+ future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
}
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 18a187913..f046f65f6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -42,6 +42,7 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
import org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
@@ -178,17 +179,18 @@ public class PhysicalPlanGenerator {
// if sinkAggregatedCommitter is empty, don't create task.
if (sinkAggregatedCommitter.isPresent()) {
long taskGroupID = idGenerator.getNextId();
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
SinkAggregatedCommitterTask<?> t =
new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
+ new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
sinkAggregatedCommitter.get());
- committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
+ committerTaskIDMap.put(s, new TaskLocation(taskGroupLocation, t.getTaskID()));
return new PhysicalVertex(idGenerator.getNextId(),
atomicInteger.incrementAndGet(),
executorService,
collect.size(),
- new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-AggregatedCommitterTask",
+ new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-AggregatedCommitterTask",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
@@ -215,15 +217,16 @@ public class PhysicalPlanGenerator {
long taskGroupIDPrefix = idGenerator.getNextId();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
setFlowConfig(flow, i);
SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
+ new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
t.add(new PhysicalVertex(idGenerator.getNextId(),
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+ new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() +
"-PartitionTransformTask",
Lists.newArrayList(seaTunnelTask)),
flakeIdGenerator,
@@ -245,15 +248,16 @@ public class PhysicalPlanGenerator {
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
- enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
+ new TaskLocation(taskGroupLocation, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
+ enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupLocation, t.getTaskID()));
return new PhysicalVertex(idGenerator.getNextId(),
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
- new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-SplitEnumerator",
+ new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-SplitEnumerator",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
@@ -282,6 +286,7 @@ public class PhysicalPlanGenerator {
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
int finalParallelismIndex = i;
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
List<SeaTunnelTask> taskList =
flows.stream().map(f -> {
setFlowConfig(f, finalParallelismIndex);
@@ -289,12 +294,12 @@ public class PhysicalPlanGenerator {
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
if (f instanceof PhysicalExecutionFlow) {
return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
+ new TaskLocation(taskGroupLocation,
mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
} else {
return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
+ new TaskLocation(taskGroupLocation,
mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
}
@@ -308,7 +313,7 @@ public class PhysicalPlanGenerator {
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupWithIntermediateQueue(taskGroupID, flow.getAction().getName() +
+ new TaskGroupWithIntermediateQueue(taskGroupLocation, flow.getAction().getName() +
"-SourceTask",
taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
flakeIdGenerator,
@@ -323,7 +328,7 @@ public class PhysicalPlanGenerator {
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+ new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() +
"-SourceTask",
taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
flakeIdGenerator,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d1fe0844e..15e7f843c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
@@ -54,6 +55,8 @@ public class PhysicalVertex {
private final long physicalVertexId;
+ private final TaskGroupLocation taskGroupLocation;
+
/**
* the index of PhysicalVertex
*/
@@ -145,6 +148,7 @@ public class PhysicalVertex {
subTaskGroupIndex + 1,
parallelism);
this.taskFuture = new CompletableFuture<>();
+ this.taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, physicalVertexId);
}
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
@@ -176,20 +180,20 @@ public class PhysicalVertex {
if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
turnToEndState(ExecutionState.CANCELED);
taskFuture.complete(
- new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+ new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
} else {
turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED,
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
this.getTaskFullName(), executionState.get()))));
}
}
} else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
turnToEndState(ExecutionState.CANCELED);
- taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(), null));
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
} else {
turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(),
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
new JobException(String.format("%s turn to a unexpected state"))));
}
@@ -199,7 +203,7 @@ public class PhysicalVertex {
ExceptionUtils.getMessage(th)));
turnToEndState(ExecutionState.FAILED);
taskFuture.complete(
- new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED, th));
+ new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
}
if (waitForCompleteByExecutionService == null) {
@@ -211,7 +215,7 @@ public class PhysicalVertex {
if (t != null) {
LOGGER.severe("An unexpected error occurred while the task was running", t);
taskFuture.complete(
- new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED,
+ new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
t));
} else {
turnToEndState(v.getExecutionState());
@@ -231,7 +235,7 @@ public class PhysicalVertex {
LOGGER.severe(
String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
turnToEndState(ExecutionState.FAILED);
- v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th);
+ v = new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th);
taskFuture.complete(v);
}
});
@@ -309,7 +313,7 @@ public class PhysicalVertex {
public void cancel() {
if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
- taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
} else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
// do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
} else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
@@ -325,7 +329,7 @@ public class PhysicalVertex {
try {
i++;
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new CancelTaskOperation(taskGroup.getId()),
+ new CancelTaskOperation(taskGroup.getTaskGroupInfo()),
currentExecutionAddress)
.invoke().get();
return;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index 72ed4526e..e31fdbcec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -21,14 +21,14 @@ import java.io.Serializable;
public class TaskExecutionState implements Serializable {
- private final long taskExecutionId;
+ private final TaskGroupLocation taskGroupLocation;
private final ExecutionState executionState;
private Throwable throwable;
- public TaskExecutionState(long taskExecutionId, ExecutionState executionState, Throwable throwable) {
- this.taskExecutionId = taskExecutionId;
+ public TaskExecutionState(TaskGroupLocation taskGroupLocation, ExecutionState executionState, Throwable throwable) {
+ this.taskGroupLocation = taskGroupLocation;
this.executionState = executionState;
this.throwable = throwable;
}
@@ -41,7 +41,7 @@ public class TaskExecutionState implements Serializable {
return throwable;
}
- public long getTaskExecutionId() {
- return taskExecutionId;
+ public TaskGroupLocation getTaskGroupInfo() {
+ return taskGroupLocation;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index ec14bdeb0..cee3eeaf3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -23,7 +23,7 @@ import java.util.Map;
public interface TaskGroup extends Serializable {
- long getId();
+ TaskGroupLocation getTaskGroupInfo();
void init();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 76b95ea26..d0d62364e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -23,14 +23,14 @@ import java.util.function.Function;
import java.util.stream.Collectors;
public class TaskGroupDefaultImpl implements TaskGroup {
- private final long id;
+ private final TaskGroupLocation taskGroupLocation;
private final String taskGroupName;
private final Map<Long, Task> tasks;
- public TaskGroupDefaultImpl(long id, String taskGroupName, Collection<Task> tasks) {
- this.id = id;
+ public TaskGroupDefaultImpl(TaskGroupLocation taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+ this.taskGroupLocation = taskGroupLocation;
this.taskGroupName = taskGroupName;
this.tasks = tasks.stream().collect(Collectors.toMap(Task::getTaskID, Function.identity()));
}
@@ -40,8 +40,8 @@ public class TaskGroupDefaultImpl implements TaskGroup {
}
@Override
- public long getId() {
- return id;
+ public TaskGroupLocation getTaskGroupInfo() {
+ return taskGroupLocation;
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
new file mode 100644
index 000000000..71c63319d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class TaskGroupLocation implements Serializable {
+ private final long jobId;
+
+ private final long pipelineId;
+
+ private final long taskGroupId;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TaskGroupLocation that = (TaskGroupLocation) o;
+
+ return new EqualsBuilder().append(jobId, that.jobId).append(pipelineId, that.pipelineId).append(taskGroupId, that.taskGroupId).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37).append(jobId).append(pipelineId).append(taskGroupId).toHashCode();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index d361037bd..cd126b19e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -22,30 +22,31 @@ import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Objects;
public class TaskLocation implements IdentifiedDataSerializable, Serializable {
- private long taskGroupID;
+ private TaskGroupLocation taskGroupLocation;
private long taskID;
public TaskLocation() {
}
- public TaskLocation(long taskGroupID, long taskID) {
- this.taskGroupID = taskGroupID;
+ public TaskLocation(TaskGroupLocation taskGroupLocation, long taskID) {
+ this.taskGroupLocation = taskGroupLocation;
this.taskID = taskID;
}
- public long getTaskGroupID() {
- return taskGroupID;
+ public TaskGroupLocation getTaskGroupLocation() {
+ return taskGroupLocation;
}
- public void setTaskGroupID(long taskGroupID) {
- this.taskGroupID = taskGroupID;
+ public void setTaskGroupLocation(TaskGroupLocation taskGroupLocation) {
+ this.taskGroupLocation = taskGroupLocation;
}
public long getTaskID() {
@@ -68,22 +69,22 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
- out.writeLong(taskGroupID);
+ out.writeObject(taskGroupLocation);
out.writeLong(taskID);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
- taskGroupID = in.readLong();
+ taskGroupLocation = in.readObject();
taskID = in.readLong();
}
@Override
public String toString() {
return "TaskLocation{" +
- "taskGroupID=" + taskGroupID +
- ", taskID=" + taskID +
- '}';
+ "taskGroupLocation=" + taskGroupLocation +
+ ", taskID=" + taskID +
+ '}';
}
@Override
@@ -95,11 +96,11 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
return false;
}
TaskLocation that = (TaskLocation) o;
- return taskGroupID == that.taskGroupID && taskID == that.taskID;
+ return new EqualsBuilder().append(taskID, that.taskID).append(taskGroupLocation, that.taskGroupLocation).isEquals();
}
@Override
public int hashCode() {
- return Objects.hash(taskGroupID, taskID);
+ return new HashCodeBuilder(17, 37).append(taskGroupLocation).append(taskID).toHashCode();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index f849dc9a7..cb459944c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,12 +17,14 @@
package org.apache.seatunnel.engine.server.resourcemanager;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
import com.hazelcast.cluster.Address;
import lombok.NonNull;
public interface ResourceManager {
- Address applyForResource(long jobId, long taskId);
+ Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
@NonNull
- Address getAppliedResource(long jobId, long taskId);
+ Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
index 59bf5b204..28bbb81d1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import com.hazelcast.cluster.Address;
import com.hazelcast.spi.impl.NodeEngine;
@@ -31,7 +32,7 @@ import java.util.Map;
public class SimpleResourceManager implements ResourceManager {
// TODO We may need more detailed resource define, instead of the resource definition method of only Address.
- private Map<Long, Map<Long, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
+ private Map<Long, Map<TaskGroupLocation, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
private final NodeEngine nodeEngine;
@@ -41,14 +42,14 @@ public class SimpleResourceManager implements ResourceManager {
@SuppressWarnings("checkstyle:MagicNumber")
@Override
- public Address applyForResource(long jobId, long taskId) {
- Map<Long, Address> jobAddressMap =
+ public Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation) {
+ Map<TaskGroupLocation, Address> jobAddressMap =
physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
Address localhost =
- jobAddressMap.putIfAbsent(taskId, nodeEngine.getThisAddress());
+ jobAddressMap.putIfAbsent(taskGroupLocation, nodeEngine.getThisAddress());
if (null == localhost) {
- localhost = jobAddressMap.get(taskId);
+ localhost = jobAddressMap.get(taskGroupLocation);
}
return localhost;
@@ -57,13 +58,13 @@ public class SimpleResourceManager implements ResourceManager {
@Override
@NonNull
- public Address getAppliedResource(long jobId, long taskId) {
- Map<Long, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+ public Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation) {
+ Map<TaskGroupLocation, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
if (null == longAddressMap || longAddressMap.isEmpty()) {
throw new JobException(
- String.format("Job %s, Task %s can not found applied resource.", jobId, taskId));
+ String.format("Job %s, Task %s can not found applied resource.", jobId, taskGroupLocation));
}
- return longAddressMap.get(taskId);
+ return longAddressMap.get(taskGroupLocation);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 8d42737d2..797de9365 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -100,7 +100,7 @@ public class PipelineBaseScheduler implements JobScheduler {
// TODO If there is no enough resources for tasks, we need add some wait profile
if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getId());
+ task.getTaskGroup().getTaskGroupInfo());
} else {
handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
}
@@ -115,7 +115,7 @@ public class PipelineBaseScheduler implements JobScheduler {
return CompletableFuture.supplyAsync(() -> {
task.deploy(
resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getId()));
+ task.getTaskGroup().getTaskGroupInfo()));
return null;
});
} else {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
index 08d5de1c1..682b11640 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.group;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import java.util.Collection;
@@ -32,8 +33,8 @@ public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
public static final int QUEUE_SIZE = 1000;
- public TaskGroupWithIntermediateQueue(long id, String taskGroupName, Collection<Task> tasks) {
- super(id, taskGroupName, tasks);
+ public TaskGroupWithIntermediateQueue(TaskGroupLocation taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+ super(taskGroupLocation, taskGroupName, tasks);
}
private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache = null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
index 37f45c17b..7c75f0960 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -33,13 +34,13 @@ import java.io.IOException;
* notified JobMaster
*/
public class CancelTaskOperation extends Operation implements IdentifiedDataSerializable {
- private long taskGroupId;
+ private TaskGroupLocation taskGroupLocation;
public CancelTaskOperation() {
}
- public CancelTaskOperation(long taskGroupId) {
- this.taskGroupId = taskGroupId;
+ public CancelTaskOperation(TaskGroupLocation taskGroupLocation) {
+ this.taskGroupLocation = taskGroupLocation;
}
@Override
@@ -55,7 +56,7 @@ public class CancelTaskOperation extends Operation implements IdentifiedDataSeri
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- server.getTaskExecutionService().cancelTaskGroup(taskGroupId);
+ server.getTaskExecutionService().cancelTaskGroup(taskGroupLocation);
}
@Override
@@ -66,12 +67,12 @@ public class CancelTaskOperation extends Operation implements IdentifiedDataSeri
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- out.writeLong(taskGroupId);
+ out.writeObject(taskGroupLocation);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- taskGroupId = in.readLong();
+ taskGroupLocation = in.readObject();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index 70f0365de..ccbc8d519 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -56,7 +56,7 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
SinkAggregatedCommitterTask<?> task = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
- task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupID())
+ task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation())
.getTaskGroup().getTask(committerTaskID.getTaskID());
break;
} catch (NullPointerException e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
index cfea0b586..558593551 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
@@ -46,7 +46,7 @@ public class SinkUnregisterOperation extends Operation implements IdentifiedData
public void run() throws Exception {
SeaTunnelServer server = getService();
SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupID()).getTaskGroup().getTask(committerTaskID.getTaskID());
+ server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
task.receivedWriterUnregister(currentTaskID);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 8fba46f6f..ddbe9ae59 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -51,7 +51,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
SourceSeaTunnelTask<?, SplitT> task =
- server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup()
+ server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getTaskGroup()
.getTask(taskID.getTaskID());
task.receivedSourceSplit(splits);
return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index 18537e1f4..f25b895b3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -47,7 +47,7 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
SourceSeaTunnelTask<?, ?> task =
- server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
+ server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupLocation())
.getTaskGroup().getTask(readerLocation.getTaskID());
task.close();
return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 1d6edefd5..d214368a1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,7 +51,7 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
.getTaskGroup().getTask(enumeratorTaskID.getTaskID());
task.requestSplit(taskID.getTaskID());
return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 626ff1911..58b09146e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,7 +49,7 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
.getTaskGroup().getTask(enumeratorTaskID.getTaskID());
task.readerFinished(currentTaskID.getTaskID());
return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 1f89f1f9d..a2f08bf70 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -57,7 +57,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
Address readerAddress = getCallerAddress();
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup()
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation()).getTaskGroup()
.getTask(enumeratorTaskID.getTaskID());
task.receivedReader(readerTaskID, readerAddress);
return null;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 26c556f1d..d6d371e49 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TestTask;
import com.google.common.collect.Lists;
@@ -50,6 +51,8 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
FlakeIdGenerator flakeIdGenerator;
long taskRunTime = 2000;
+ long jobId = 10001;
+ long pipeLineId = 100001;
@Before
@Override
@@ -87,10 +90,10 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
- long taskGroupId = flakeIdGenerator.newId();
- CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(taskGroupId, "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+ TaskGroupDefaultImpl ts = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2));
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(ts, new CompletableFuture<>());
- taskExecutionService.cancelTaskGroup(taskGroupId);
+ taskExecutionService.cancelTaskGroup(ts.getTaskGroupInfo());
await().atMost(sleepTime + 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(CANCELED, completableFuture.get().getExecutionState()));
@@ -106,7 +109,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
- final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
+ final CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
completableFuture.whenComplete((unused, throwable) -> futureMark.set(true));
stop.set(true);
@@ -133,7 +136,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
- CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
// Run it for a while
Thread.sleep(taskRunTime);
@@ -178,11 +181,11 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
tasks.addAll(lowLagTask);
Collections.shuffle(tasks);
- CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(tasks)), new CompletableFuture<>());
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(tasks)), new CompletableFuture<>());
- CompletableFuture<TaskExecutionState> t1c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t1", Lists.newArrayList(t1)), new CompletableFuture<>());
+ CompletableFuture<TaskExecutionState> t1c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t1", Lists.newArrayList(t1)), new CompletableFuture<>());
- CompletableFuture<TaskExecutionState> t2c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "t2", Lists.newArrayList(t2)), new CompletableFuture<>());
+ CompletableFuture<TaskExecutionState> t2c = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "t2", Lists.newArrayList(t2)), new CompletableFuture<>());
Thread.sleep(taskRunTime);
@@ -221,7 +224,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
tasks.addAll(lowLagTask);
Collections.shuffle(tasks);
- TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl(flakeIdGenerator.newId(), "ts", Lists.newArrayList(tasks));
+ TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, flakeIdGenerator.newId()), "ts", Lists.newArrayList(tasks));
logger.info("task size is : " + taskGroup.getTasks().size());