You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/05 07:57:02 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new b830741c7 [Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)
b830741c7 is described below
commit b830741c747fa04fb1f5bc9f08d6ca9fa9260572
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Sep 5 15:56:55 2022 +0800
[Engine][Task] Move SourceSplitEnumeratorTask to worker node (#2628)
* [Engine][Task] Move SourceSplitEnumeratorTask to worker node
---
.../apache/seatunnel/common/utils/RetryUtils.java | 4 +-
seatunnel-engine/seatunnel-engine-client/pom.xml | 1 -
seatunnel-engine/seatunnel-engine-server/pom.xml | 1 -
.../engine/server/TaskExecutionService.java | 9 +---
.../engine/server/dag/physical/PhysicalVertex.java | 25 +++++----
.../server/execution/TaskExecutionContext.java | 9 +---
.../seatunnel/engine/server/master/JobMaster.java | 28 +++++++---
.../engine/server/scheduler/JobScheduler.java | 7 ++-
.../server/scheduler/PipelineBaseScheduler.java | 63 ++++++++++++----------
.../serializable/TaskDataSerializerHook.java | 6 +++
.../server/service/slot/DefaultSlotService.java | 7 ++-
.../engine/server/service/slot/SlotContext.java | 1 -
.../engine/server/service/slot/SlotService.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 28 +++++++---
.../server/task/flow/SourceFlowLifeCycle.java | 36 ++++++++-----
.../server/task/operation/DeployTaskOperation.java | 4 +-
...tion.java => GetTaskGroupAddressOperation.java} | 60 +++++++++++++--------
17 files changed, 180 insertions(+), 111 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index f6ac9a694..55492a4e4 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -34,12 +34,14 @@ public class RetryUtils {
if (retryMaterial.getRetryTimes() < 0) {
throw new IllegalArgumentException("Retry times must be greater than 0");
}
+ Exception lastE;
int i = 0;
do {
i++;
try {
return execution.execute();
} catch (Exception e) {
+ lastE = e;
if (retryCondition != null && !retryCondition.canRetry(e)) {
if (retryMaterial.shouldThrowException()) {
throw e;
@@ -50,7 +52,7 @@ public class RetryUtils {
}
} while (i <= retryTimes);
if (retryMaterial.shouldThrowException()) {
- throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times");
+ throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times", lastE);
}
return null;
}
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 33fd88a8c..b24ad2088 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -59,7 +59,6 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-local-file</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
index c17fb0bbd..d02bcf67c 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -43,7 +43,6 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-local-file</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 82a6bbf52..81875a852 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -39,7 +39,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
-import org.apache.seatunnel.engine.server.service.slot.SlotContext;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import com.google.common.collect.Lists;
@@ -85,7 +84,6 @@ public class TaskExecutionService {
// key: TaskID
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
- private SlotContext slotContext;
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -102,10 +100,6 @@ public class TaskExecutionService {
executorService.shutdownNow();
}
- public void setSlotContext(SlotContext slotContext) {
- this.slotContext = slotContext;
- }
-
public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
return executionContexts.get(taskGroupLocation);
}
@@ -186,8 +180,7 @@ public class TaskExecutionService {
final Map<Boolean, List<Task>> byCooperation =
tasks.stream()
.peek(task -> {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine,
- slotContext);
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine);
task.setTaskExecutionContext(taskExecutionContext);
taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
})
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 08ab71c15..1e8dc0ecf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -144,26 +144,22 @@ public class PhysicalVertex {
subTaskGroupIndex + 1,
parallelism);
this.taskFuture = new CompletableFuture<>();
- this.taskGroupLocation = new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, physicalVertexId);
+ this.taskGroupLocation = taskGroup.getTaskGroupLocation();
}
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
return new PassiveCompletableFuture<>(this.taskFuture);
}
- public void deployOnMaster() {
- currentExecutionAddress = nodeEngine.getMasterAddress();
+ private void deployOnLocal(@NonNull SlotProfile slotProfile) {
deployInternal(taskGroupImmutableInformation -> {
SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- return new PassiveCompletableFuture<>(server.getTaskExecutionService()
- .deployTask(taskGroupImmutableInformation));
+ return new PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
+ .getTaskExecutionService().deployTask(taskGroupImmutableInformation));
});
}
- @SuppressWarnings("checkstyle:MagicNumber")
- // This method must not throw an exception
- public void deploy(@NonNull SlotProfile slotProfile) {
- currentExecutionAddress = slotProfile.getWorker();
+ private void deployOnRemote(@NonNull SlotProfile slotProfile) {
deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
new DeployTaskOperation(slotProfile,
@@ -172,6 +168,17 @@ public class PhysicalVertex {
.invoke()));
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ // This method must not throw an exception
+ public void deploy(@NonNull SlotProfile slotProfile) {
+ currentExecutionAddress = slotProfile.getWorker();
+ if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+ deployOnLocal(slotProfile);
+ } else {
+ deployOnRemote(slotProfile);
+ }
+ }
+
private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
PassiveCompletableFuture<TaskExecutionState> completeFuture;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 5b8add35c..4ba8d075f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.server.execution;
-import org.apache.seatunnel.engine.server.service.slot.SlotContext;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
@@ -31,12 +30,9 @@ public class TaskExecutionContext {
private final Task task;
private final NodeEngineImpl nodeEngine;
- private final SlotContext slotContext;
-
- public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine, SlotContext slotContext) {
+ public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
this.task = task;
this.nodeEngine = nodeEngine;
- this.slotContext = slotContext;
}
public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -55,7 +51,4 @@ public class TaskExecutionContext {
return (T) task;
}
- public SlotContext getSlotContext() {
- return slotContext;
- }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 942dc8413..98d865f55 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,11 +29,13 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
-import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
+import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.datamodel.Tuple2;
@@ -45,7 +47,9 @@ import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
public class JobMaster implements Runnable {
@@ -69,6 +73,8 @@ public class JobMaster implements Runnable {
private JobImmutableInformation jobImmutableInformation;
+ private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
@@ -76,8 +82,8 @@ public class JobMaster implements Runnable {
this.nodeEngine = nodeEngine;
this.executorService = executorService;
flakeIdGenerator =
- this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
-
+ this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+ this.ownedSlotProfiles = new ConcurrentHashMap<>();
this.resourceManager = resourceManager;
}
@@ -127,9 +133,7 @@ public class JobMaster implements Runnable {
}
jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
});
-
- JobScheduler jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
- jobScheduler.startScheduling();
+ ownedSlotProfiles.putAll(new PipelineBaseScheduler(physicalPlan, this).startScheduling());
} catch (Throwable e) {
LOGGER.severe(String.format("Job %s (%s) run error with: %s",
physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
@@ -146,6 +150,18 @@ public class JobMaster implements Runnable {
// TODO Add some job clean operation
}
+ public Address queryTaskGroupAddress(long taskGroupId) {
+ for (Integer pipelineIndex : ownedSlotProfiles.keySet()) {
+ Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineIndex).keySet().stream()
+ .filter(physicalVertex -> physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+ .findFirst();
+ if (currentVertex.isPresent()) {
+ return ownedSlotProfiles.get(pipelineIndex).get(currentVertex.get()).getWorker();
+ }
+ }
+ throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
+ }
+
public void cancelJob() {
this.physicalPlan.cancelJob();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index bbe003e39..25aece4a4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.engine.server.scheduler;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import java.util.Map;
+
public interface JobScheduler {
- void startScheduling();
+ Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 6c561a709..22a6a0f16 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -41,8 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
public class PipelineBaseScheduler implements JobScheduler {
@@ -61,7 +61,8 @@ public class PipelineBaseScheduler implements JobScheduler {
}
@Override
- public void startScheduling() {
+ public Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling() {
+ Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles = new ConcurrentHashMap<>();
if (physicalPlan.turnToRunning()) {
List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
@@ -71,10 +72,13 @@ public class PipelineBaseScheduler implements JobScheduler {
Map<PhysicalVertex, SlotProfile> slotProfiles;
try {
slotProfiles = applyResourceForPipeline(pipeline);
+ ownedSlotProfiles.put(pipeline.getPipelineIndex(), slotProfiles);
} catch (Exception e) {
throw new RuntimeException(e);
}
- pipeline.whenComplete((state, error) -> releasePipelineResource(Lists.newArrayList(slotProfiles.values())));
+ pipeline.whenComplete((state, error) -> {
+ releasePipelineResource(Lists.newArrayList(slotProfiles.values()));
+ });
// deploy pipeline
return CompletableFuture.supplyAsync(() -> {
// TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
@@ -95,6 +99,7 @@ public class PipelineBaseScheduler implements JobScheduler {
throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
physicalPlan.getJobStatus()));
}
+ return ownedSlotProfiles;
}
private void releasePipelineResource(List<SlotProfile> slotProfiles) {
@@ -108,19 +113,11 @@ public class PipelineBaseScheduler implements JobScheduler {
try {
Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
- subPlan.getCoordinatorVertexList().forEach(coordinator -> {
- coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- });
+ // TODO If there is no enough resources for tasks, we need add some wait profile
+ subPlan.getCoordinatorVertexList().forEach(coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+
+ subPlan.getPhysicalVertexList().forEach(task -> futures.put(task, applyResourceForTask(task)));
- subPlan.getPhysicalVertexList().forEach(task -> {
- // TODO If there is no enough resources for tasks, we need add some wait profile
- if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
- // TODO custom resource size
- futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
- } else {
- handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
- }
- });
for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
try {
slotProfiles.put(future.getKey(), future.getValue().get());
@@ -136,10 +133,23 @@ public class PipelineBaseScheduler implements JobScheduler {
}
}
- private CompletableFuture<Void> deployTask(PhysicalVertex task, Supplier<Void> deployMethod) {
+ private CompletableFuture<SlotProfile> applyResourceForTask(PhysicalVertex task) {
+ if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+ // TODO custom resource size
+ return resourceManager.applyResource(jobId, new ResourceProfile());
+ } else {
+ handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+ return null;
+ }
+ }
+
+ private CompletableFuture<Void> deployTask(PhysicalVertex task, SlotProfile slotProfile) {
if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
// deploy is a time-consuming operation, so we do it async
- return CompletableFuture.supplyAsync(deployMethod);
+ return CompletableFuture.supplyAsync(() -> {
+ task.deploy(slotProfile);
+ return null;
+ });
} else {
handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
}
@@ -148,19 +158,16 @@ public class PipelineBaseScheduler implements JobScheduler {
private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
- List<CompletableFuture<?>> deployCoordinatorFuture =
- pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, () -> {
- coordinator.deployOnMaster();
- return null;
- })).filter(Objects::nonNull).collect(Collectors.toList());
-
- List<CompletableFuture<?>> deployTaskFuture =
- pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, () -> {
- task.deploy(slotProfiles.get(task));
- return null;
- })).filter(Objects::nonNull).collect(Collectors.toList());
try {
+ List<CompletableFuture<?>> deployCoordinatorFuture =
+ pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+ .filter(Objects::nonNull).collect(Collectors.toList());
+
+ List<CompletableFuture<?>> deployTaskFuture =
+ pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, slotProfiles.get(task)))
+ .filter(Objects::nonNull).collect(Collectors.toList());
+
deployCoordinatorFuture.addAll(deployTaskFuture);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
deployCoordinatorFuture.toArray(new CompletableFuture[0]));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 3ab25429f..90a0011dd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.server.task.Progress;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
@@ -65,6 +66,9 @@ public class TaskDataSerializerHook implements DataSerializerHook {
public static final int CANCEL_TASK_OPERATOR = 13;
+ public static final int GET_TASKGROUP_ADDRESS_TYPE = 14;
+
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -111,6 +115,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
return new DeployTaskOperation();
case CANCEL_TASK_OPERATOR:
return new CancelTaskOperation();
+ case GET_TASKGROUP_ADDRESS_TYPE:
+ return new GetTaskGroupAddressOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 5c60d25cf..691cbb815 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -134,8 +134,11 @@ public class DefaultSlotService implements SlotService {
return new SlotAndWorkerProfile(toWorkerProfile(), profile);
}
- public SlotContext getSlotContext(int slotID) {
- return contexts.get(slotID);
+ public SlotContext getSlotContext(SlotProfile slotProfile) {
+ if (!contexts.containsKey(slotProfile.getSlotID())) {
+ throw new WrongTargetSlotException("Unknown slot in slot service, slot profile: " + slotProfile);
+ }
+ return contexts.get(slotProfile.getSlotID());
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
index d975de191..8ec29605e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
@@ -26,7 +26,6 @@ public class SlotContext {
public SlotContext(int slotID, TaskExecutionService taskExecutionService) {
this.slotID = slotID;
this.taskExecutionService = taskExecutionService;
- this.taskExecutionService.setSlotContext(this);
}
public int getSlotID() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index 5d5a5ab14..d39276b26 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -28,7 +28,7 @@ public interface SlotService {
SlotAndWorkerProfile requestSlot(long jobID, ResourceProfile resourceProfile);
- SlotContext getSlotContext(int slotID);
+ SlotContext getSlotContext(SlotProfile slotProfile);
void releaseSlot(long jobId, SlotProfile slotProfile);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 939033c3f..7c93f4113 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -27,15 +27,19 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
import org.apache.seatunnel.engine.server.task.record.ClosedSign;
+import com.hazelcast.cluster.Address;
+
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
@@ -53,18 +57,20 @@ public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implemen
private final TaskLocation taskLocation;
- private final TaskLocation committerTaskID;
+ private Address committerTaskAddress;
+
+ private final TaskLocation committerTaskLocation;
private final boolean containCommitter;
public SinkFlowLifeCycle(SinkAction<T, StateT, ?, ?> sinkAction, TaskLocation taskLocation, int indexID,
- SeaTunnelTask runningTask, TaskLocation committerTaskID,
+ SeaTunnelTask runningTask, TaskLocation committerTaskLocation,
boolean containCommitter, CompletableFuture<Void> completableFuture) {
super(runningTask, completableFuture);
this.sinkAction = sinkAction;
this.indexID = indexID;
this.taskLocation = taskLocation;
- this.committerTaskID = committerTaskID;
+ this.committerTaskLocation = committerTaskLocation;
this.containCommitter = containCommitter;
}
@@ -78,23 +84,31 @@ public class SinkFlowLifeCycle<T, StateT> extends AbstractFlowLifeCycle implemen
this.writerStateSerializer = sinkAction.getSink().getWriterStateSerializer();
this.protoStuffSerializer = new ProtoStuffSerializer();
states = null;
+ if (containCommitter) {
+ committerTaskAddress = getCommitterTaskAddress();
+ }
registerCommitter();
}
+ private Address getCommitterTaskAddress() throws ExecutionException, InterruptedException {
+ return (Address) runningTask.getExecutionContext()
+ .sendToMaster(new GetTaskGroupAddressOperation(committerTaskLocation)).get();
+ }
+
@Override
public void close() throws IOException {
super.close();
writer.close();
if (containCommitter) {
- runningTask.getExecutionContext().sendToMaster(new SinkUnregisterOperation(taskLocation,
- committerTaskID)).join();
+ runningTask.getExecutionContext().sendToMember(new SinkUnregisterOperation(taskLocation,
+ committerTaskLocation), committerTaskAddress).join();
}
}
private void registerCommitter() {
if (containCommitter) {
- runningTask.getExecutionContext().sendToMaster(new SinkRegisterOperation(taskLocation,
- committerTaskID)).join();
+ runningTask.getExecutionContext().sendToMember(new SinkRegisterOperation(taskLocation,
+ committerTaskLocation), committerTaskAddress).join();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index d3b5b0218..c2bebbab2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -29,10 +29,12 @@ import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SourceReaderContext;
+import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
+import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -47,7 +49,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
private static final ILogger LOGGER = Logger.getLogger(SourceFlowLifeCycle.class);
private final SourceAction<T, SplitT, ?> sourceAction;
- private final TaskLocation enumeratorTaskID;
+ private final TaskLocation enumeratorTaskLocation;
+
+ private Address enumeratorTaskAddress;
private SourceReader<T, SplitT> reader;
@@ -56,20 +60,20 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
private transient org.apache.seatunnel.engine.checkpoint.storage.common.Serializer protoStuffSerializer;
private final int indexID;
- private final TaskLocation currentTaskID;
+ private final TaskLocation currentTaskLocation;
private SeaTunnelSourceCollector<T> collector;
private volatile boolean closed;
public SourceFlowLifeCycle(SourceAction<T, SplitT, ?> sourceAction, int indexID,
- TaskLocation enumeratorTaskID, SeaTunnelTask runningTask,
- TaskLocation currentTaskID, CompletableFuture<Void> completableFuture) {
+ TaskLocation enumeratorTaskLocation, SeaTunnelTask runningTask,
+ TaskLocation currentTaskLocation, CompletableFuture<Void> completableFuture) {
super(runningTask, completableFuture);
this.sourceAction = sourceAction;
this.indexID = indexID;
- this.enumeratorTaskID = enumeratorTaskID;
- this.currentTaskID = currentTaskID;
+ this.enumeratorTaskLocation = enumeratorTaskLocation;
+ this.currentTaskLocation = currentTaskLocation;
}
public void setCollector(SeaTunnelSourceCollector<T> collector) {
@@ -82,11 +86,17 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
this.splitSerializer = sourceAction.getSource().getSplitSerializer();
this.protoStuffSerializer = new ProtoStuffSerializer();
reader = sourceAction.getSource()
- .createReader(new SourceReaderContext(indexID, sourceAction.getSource().getBoundedness(), this));
+ .createReader(new SourceReaderContext(indexID, sourceAction.getSource().getBoundedness(), this));
reader.open();
+ enumeratorTaskAddress = getEnumeratorTaskAddress();
register();
}
+ private Address getEnumeratorTaskAddress() throws ExecutionException, InterruptedException {
+ return (Address) runningTask.getExecutionContext()
+ .sendToMaster(new GetTaskGroupAddressOperation(enumeratorTaskLocation)).get();
+ }
+
@Override
public void close() throws IOException {
collector.close();
@@ -104,8 +114,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
// ready close this reader
try {
this.closed = true;
- runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
- enumeratorTaskID)).get();
+ runningTask.getExecutionContext().sendToMember(new SourceNoMoreElementOperation(currentTaskLocation,
+ enumeratorTaskLocation), enumeratorTaskAddress).get();
} catch (Exception e) {
LOGGER.warning("source close failed ", e);
throw new RuntimeException(e);
@@ -114,8 +124,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
private void register() {
try {
- runningTask.getExecutionContext().sendToMaster(new SourceRegisterOperation(currentTaskID,
- enumeratorTaskID)).get();
+ runningTask.getExecutionContext().sendToMember(new SourceRegisterOperation(currentTaskLocation,
+ enumeratorTaskLocation), enumeratorTaskAddress).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.warning("source register failed ", e);
throw new RuntimeException(e);
@@ -124,8 +134,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
public void requestSplit() {
try {
- runningTask.getExecutionContext().sendToMaster(new RequestSplitOperation(currentTaskID,
- enumeratorTaskID)).get();
+ runningTask.getExecutionContext().sendToMember(new RequestSplitOperation(currentTaskLocation,
+ enumeratorTaskLocation), enumeratorTaskAddress).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.warning("source request split failed", e);
throw new RuntimeException(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 155307a6e..a67a43456 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -46,8 +46,8 @@ public class DeployTaskOperation extends AsyncOperation {
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer server = getService();
- return server.getSlotService().getSlotContext(slotProfile.getSlotID())
- .getTaskExecutionService().deployTask(taskImmutableInformation);
+ return server.getSlotService().getSlotContext(slotProfile)
+ .getTaskExecutionService().deployTask(taskImmutableInformation);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
similarity index 50%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 155307a6e..df6544140 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -17,55 +17,71 @@
package org.apache.seatunnel.engine.server.task.operation;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.operation.AsyncOperation;
-import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.cluster.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class DeployTaskOperation extends AsyncOperation {
- private Data taskImmutableInformation;
- private SlotProfile slotProfile;
+public class GetTaskGroupAddressOperation extends Operation implements IdentifiedDataSerializable {
- public DeployTaskOperation() {
+ private TaskLocation taskLocation;
+
+ private Address response;
+
+ public GetTaskGroupAddressOperation() {
}
- public DeployTaskOperation(@NonNull SlotProfile slotProfile, @NonNull Data taskImmutableInformation) {
- this.taskImmutableInformation = taskImmutableInformation;
- this.slotProfile = slotProfile;
+ public GetTaskGroupAddressOperation(TaskLocation taskLocation) {
+ this.taskLocation = taskLocation;
}
@Override
- protected PassiveCompletableFuture<?> doRun() throws Exception {
+ public void run() throws Exception {
SeaTunnelServer server = getService();
- return server.getSlotService().getSlotContext(slotProfile.getSlotID())
- .getTaskExecutionService().deployTask(taskImmutableInformation);
+ response = RetryUtils.retryWithException(() -> server.getJobMaster(taskLocation.getJobId())
+ .queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
+ new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
- public int getClassId() {
- return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
+ public Object getResponse() {
+ return response;
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- IOUtil.writeData(out, taskImmutableInformation);
- out.writeObject(slotProfile);
+ taskLocation.writeData(out);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- taskImmutableInformation = IOUtil.readData(in);
- slotProfile = in.readObject();
+ taskLocation.readData(in);
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
}
}