You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/01 09:56:01 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][ResourceManager] Add ResourceManager (#2472)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 176e4ac75 [Engine][ResourceManager] Add ResourceManager (#2472)
176e4ac75 is described below
commit 176e4ac753ce9803f1c634593a6f7bb827ce775a
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Sep 1 17:55:56 2022 +0800
[Engine][ResourceManager] Add ResourceManager (#2472)
* [Engine][ResourceManager] add resource manager interface
* [Engine][ResourceManager] add resource manager interface
* [Engine][ResourceManager] Add SlotService
* [Engine][ResourceManager] Add seatunnel resource manager and slot service
* [Engine][Task] Add deploy on master
* [Engine][ResourceManager] Add resource manager and slot service
* [Engine][ResourceManager] Add resource manager and slot service
* [Engine][ResourceManager] Add volatile for state
* [Engine][ResourceManager] Add lazy load from ResourceManager
* [Engine][ResourceManager] Fix bugs
* [Engine][ResourceManager] Fix merge problems
* [Engine][ResourceManager] Fix checkstyle
* [Engine][ResourceManager] add UT and improve encapsulation of the slot
* [Engine][ResourceManager] fix UT error
* [Engine][ResourceManager] change WorkerTaskLocation to TaskLocation
* [Engine][ResourceManager] fix close bug
* [Engine][ResourceManager] fix review problems
* [Engine][ResourceManager] fix review problems
* [Engine][ResourceManager] remove HeartbeatManager and use hazelcast member service
* [Engine][ResourceManager] Add reset from SlotService
---
.../engine/common/runtime/DeployType.java} | 16 +-
.../engine/common/runtime/ExecutionMode.java} | 15 +-
.../serializeable/SeaTunnelFactoryIdConstant.java | 4 +
.../seatunnel/engine/server/SeaTunnelServer.java | 54 ++++-
.../engine/server/TaskExecutionService.java | 31 ++-
.../engine/server/dag/physical/PhysicalPlan.java | 2 +-
.../engine/server/dag/physical/PhysicalVertex.java | 79 +++++---
.../engine/server/dag/physical/SubPlan.java | 11 +-
.../seatunnel/engine/server/execution/Task.java | 2 +-
.../server/execution/TaskExecutionContext.java | 10 +-
.../seatunnel/engine/server/master/JobMaster.java | 7 +-
.../resourcemanager/AbstractResourceManager.java | 162 +++++++++++++++
...Manager.java => NoEnoughResourceException.java} | 14 +-
.../server/resourcemanager/ResourceManager.java | 34 +++-
.../resourcemanager/ResourceManagerFactory.java | 51 +++++
.../resourcemanager/ResourceRequestHandler.java | 194 ++++++++++++++++++
.../resourcemanager/SimpleResourceManager.java | 70 -------
...Manager.java => StandaloneResourceManager.java} | 13 +-
...er.java => UnsupportedDeployTypeException.java} | 13 +-
.../opeartion/ReleaseSlotOperation.java} | 42 ++--
.../opeartion/RequestSlotOperation.java} | 49 +++--
.../opeartion/ResetResourceOperation.java | 50 +++++
.../opeartion/WorkerHeartbeatOperation.java} | 38 ++--
.../{ResourceManager.java => resource/CPU.java} | 28 ++-
.../{ResourceManager.java => resource/Memory.java} | 28 ++-
.../Resource.java} | 16 +-
.../resourcemanager/resource/ResourceProfile.java | 74 +++++++
.../resourcemanager/resource/SlotProfile.java | 84 ++++++++
.../CreateWorkerResult.java} | 16 +-
.../ThirdPartyResourceManager.java} | 16 +-
.../kubernetes/KubernetesResourceManager.java | 44 +++++
.../thirdparty/yarn/YarnResourceManager.java | 43 ++++
.../resourcemanager/worker/WorkerProfile.java | 51 +++++
.../server/scheduler/PipelineBaseScheduler.java | 106 ++++++----
.../serializable/ResourceDataSerializerHook.java | 75 +++++++
.../server/service/slot/DefaultSlotService.java | 217 +++++++++++++++++++++
.../slot/SlotAndWorkerProfile.java} | 32 +--
.../Task.java => service/slot/SlotContext.java} | 32 ++-
.../slot/SlotService.java} | 23 ++-
.../slot/WrongTargetSlotException.java} | 16 +-
.../server/task/SourceSplitEnumeratorTask.java | 2 +-
.../server/task/operation/DeployTaskOperation.java | 10 +-
.../task/operation/sink/SinkRegisterOperation.java | 7 +-
.../operation/sink/SinkUnregisterOperation.java | 2 +-
.../operation/source/AssignSplitOperation.java | 4 +-
.../operation/source/CloseRequestOperation.java | 4 +-
.../operation/source/RequestSplitOperation.java | 4 +-
.../source/SourceNoMoreElementOperation.java | 3 +-
.../operation/source/SourceRegisterOperation.java | 3 +-
.../services/com.hazelcast.DataSerializerHook | 3 +-
.../resourcemanager/ResourceManagerTest.java | 68 +++++++
51 files changed, 1582 insertions(+), 390 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
index cb459944c..7c61c126c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
@@ -15,16 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.common.runtime;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+public enum DeployType {
+ STANDALONE,
+ YARN,
+ KUBERNETES
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
index cb459944c..061d0c1c8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
@@ -15,16 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.common.runtime;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+public enum ExecutionMode {
+ LOCAL,
+ CLUSTER
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index 0f2470495..f65c95e62 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -43,4 +43,8 @@ public final class SeaTunnelFactoryIdConstant {
public static final String SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY =
"hazelcast.serialization.ds.seatunnel.engine.task";
public static final int SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID = -30004;
+
+ public static final String SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY =
+ "hazelcast.serialization.ds.seatunnel.engine.resource";
+ public static final int SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID = -30005;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 08560c7db..7c912f304 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -22,6 +22,10 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
+import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
+import org.apache.seatunnel.engine.server.service.slot.SlotService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.instance.impl.Node;
@@ -53,9 +57,11 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private final ILogger logger;
private final LiveOperationRegistry liveOperationRegistry;
+ private volatile SlotService slotService;
private TaskExecutionService taskExecutionService;
private final ExecutorService executorService;
+ private volatile ResourceManager resourceManager;
private final SeaTunnelConfig seaTunnelConfig;
@@ -75,8 +81,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
logger.info("SeaTunnel server start...");
}
- public TaskExecutionService getTaskExecutionService() {
- return this.taskExecutionService;
+ /**
+ * Lazy load for Slot Service
+ */
+ public SlotService getSlotService() {
+ if (slotService == null) {
+ synchronized (this) {
+ if (slotService == null) {
+ SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, true, 2);
+ service.init();
+ slotService = service;
+ }
+ }
+ }
+ return slotService;
}
public JobMaster getJobMaster(Long jobId) {
@@ -86,10 +104,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void init(NodeEngine engine, Properties hzProperties) {
this.nodeEngine = (NodeEngineImpl) engine;
+ // TODO Determine whether to execute there method on the master node according to the deploy type
taskExecutionService = new TaskExecutionService(
nodeEngine, nodeEngine.getProperties()
);
taskExecutionService.start();
+ getSlotService();
}
@Override
@@ -99,6 +119,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void shutdown(boolean terminate) {
+ if (slotService != null) {
+ slotService.close();
+ }
+ if (resourceManager != null) {
+ resourceManager.close();
+ }
taskExecutionService.shutdown();
}
@@ -109,7 +135,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void memberRemoved(MembershipServiceEvent event) {
-
+ resourceManager.memberRemoved(event);
}
@Override
@@ -129,12 +155,32 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return liveOperationRegistry;
}
+ /**
+ * Lazy load for resource manager
+ */
+ public ResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ synchronized (this) {
+ if (resourceManager == null) {
+ ResourceManager manager = new ResourceManagerFactory(nodeEngine).getResourceManager();
+ manager.init();
+ resourceManager = manager;
+ }
+ }
+ }
+ return resourceManager;
+ }
+
+ public TaskExecutionService getTaskExecutionService() {
+ return taskExecutionService;
+ }
+
/**
* call by client to submit job
*/
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
- JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
+ JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService, getResourceManager());
executorService.submit(() -> {
try {
jobMaster.init();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 03ea4d53e..82a6bbf52 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -36,8 +36,10 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.service.slot.SlotContext;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import com.google.common.collect.Lists;
@@ -83,6 +85,7 @@ public class TaskExecutionService {
// key: TaskID
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+ private SlotContext slotContext;
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -99,6 +102,10 @@ public class TaskExecutionService {
executorService.shutdownNow();
}
+ public void setSlotContext(SlotContext slotContext) {
+ this.slotContext = slotContext;
+ }
+
public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
return executionContexts.get(taskGroupLocation);
}
@@ -125,14 +132,21 @@ public class TaskExecutionService {
uncheckRun(startedLatch::await);
}
+ public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
+ TaskGroupImmutableInformation taskImmutableInfo =
+ nodeEngine.getSerializationService().toObject(taskImmutableInformation);
+ return deployTask(taskImmutableInfo);
+ }
+
+ public <T extends Task> T getTask(TaskLocation taskLocation) {
+ return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup().getTask(taskLocation.getTaskID());
+ }
+
public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
- @NonNull Data taskImmutableInformation
- ) {
+ @NonNull TaskGroupImmutableInformation taskImmutableInfo) {
CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
TaskGroup taskGroup = null;
try {
- TaskGroupImmutableInformation taskImmutableInfo =
- nodeEngine.getSerializationService().toObject(taskImmutableInformation);
Set<URL> jars = taskImmutableInfo.getJars();
if (!CollectionUtils.isEmpty(jars)) {
@@ -171,10 +185,11 @@ public class TaskExecutionService {
ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
final Map<Boolean, List<Task>> byCooperation =
tasks.stream()
- .peek(x -> {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine);
- x.setTaskExecutionContext(taskExecutionContext);
- taskExecutionContextMap.put(x.getTaskID(), taskExecutionContext);
+ .peek(task -> {
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine,
+ slotContext);
+ task.setTaskExecutionContext(taskExecutionContext);
+ taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
})
.collect(partitioningBy(Task::isThreadsShare));
submitThreadShareTask(executionTracker, byCooperation.get(true));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 5a1daecf4..6fd0fe132 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -88,7 +88,7 @@ public class PhysicalPlan {
}
public void initStateFuture() {
- pipelineList.stream().forEach(subPlan -> {
+ pipelineList.forEach(subPlan -> {
PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
future.whenComplete((v, t) -> {
// We need not handle t, Because we will not return t from Pipeline
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d091fa330..08ab71c15 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -22,11 +22,13 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
@@ -43,6 +45,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
/**
* PhysicalVertex is responsible for the scheduling and execution of a single task parallel
@@ -94,11 +97,6 @@ public class PhysicalVertex {
*/
private final long[] stateTimestamps;
- /**
- * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
- */
- private PassiveCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
-
private final JobImmutableInformation jobImmutableInformation;
private final long initializationTimestamp;
@@ -107,8 +105,6 @@ public class PhysicalVertex {
private Address currentExecutionAddress;
- private TaskGroupImmutableInformation taskGroupImmutableInformation;
-
public PhysicalVertex(long physicalVertexId,
int subTaskGroupIndex,
@NonNull ExecutorService executorService,
@@ -155,23 +151,33 @@ public class PhysicalVertex {
return new PassiveCompletableFuture<>(this.taskFuture);
}
+ public void deployOnMaster() {
+ currentExecutionAddress = nodeEngine.getMasterAddress();
+ deployInternal(taskGroupImmutableInformation -> {
+ SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ return new PassiveCompletableFuture<>(server.getTaskExecutionService()
+ .deployTask(taskGroupImmutableInformation));
+ });
+ }
+
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
- public void deploy(@NonNull Address address) {
- currentExecutionAddress = address;
- taskGroupImmutableInformation = new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
- this.pluginJarsUrls);
+ public void deploy(@NonNull SlotProfile slotProfile) {
+ currentExecutionAddress = slotProfile.getWorker();
+ deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(slotProfile,
+ nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ slotProfile.getWorker())
+ .invoke()));
+ }
+ private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+ TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
+ PassiveCompletableFuture<TaskExecutionState> completeFuture;
try {
if (ExecutionState.DEPLOYING.equals(executionState.get())) {
- waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
- nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(
- nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- currentExecutionAddress)
- .invoke());
-
+ completeFuture = deployMethod.apply(taskGroupImmutableInformation);
// may be canceling
if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
// If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
@@ -188,29 +194,42 @@ public class PhysicalVertex {
this.getTaskFullName(), executionState.get()))));
}
}
+ monitorTask(completeFuture);
} else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
turnToEndState(ExecutionState.CANCELED);
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
} else {
turnToEndState(ExecutionState.FAILED);
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
- new JobException(String.format("%s turn to a unexpected state"))));
+ new JobException(String.format("%s turn to a unexpected state", getTaskFullName()))));
}
} catch (Throwable th) {
- LOGGER.severe(String.format("%s deploy error with Exception: %s",
- this.taskFullName,
- ExceptionUtils.getMessage(th)));
- turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(
- new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
+ failedByException(th);
}
+ }
- if (waitForCompleteByExecutionService == null) {
- return;
- }
+ private void failedByException(Throwable th) {
+ LOGGER.severe(String.format("%s deploy error with Exception: %s",
+ this.taskFullName,
+ ExceptionUtils.getMessage(th)));
+ turnToEndState(ExecutionState.FAILED);
+ taskFuture.complete(
+ new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
+ }
- waitForCompleteByExecutionService.whenComplete((v, t) -> {
+ private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+ return new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
+ }
+
+ /**
+ * @param completeFuture This future only can completion by the task run in
+ * {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
+ */
+ private void monitorTask(PassiveCompletableFuture<TaskExecutionState> completeFuture) {
+ completeFuture.whenComplete((v, t) -> {
try {
if (t != null) {
LOGGER.severe("An unexpected error occurred while the task was running", t);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index dc7974518..ea11a93d0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class SubPlan {
@@ -96,11 +97,11 @@ public class SubPlan {
}
public PassiveCompletableFuture<PipelineState> initStateFuture() {
- physicalVertexList.stream().forEach(m -> {
+ physicalVertexList.forEach(m -> {
addPhysicalVertexCallBack(m.initStateFuture());
});
- coordinatorVertexList.stream().forEach(m -> {
+ coordinatorVertexList.forEach(m -> {
addPhysicalVertexCallBack(m.initStateFuture());
});
@@ -136,11 +137,15 @@ public class SubPlan {
turnToEndState(PipelineState.FINISHED);
LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
}
- pipelineFuture.complete(pipelineState.get());
+ this.pipelineFuture.complete(pipelineState.get());
}
});
}
+ public void whenComplete(BiConsumer<? super PipelineState, ? super Throwable> action) {
+ this.pipelineFuture.whenComplete(action);
+ }
+
private void turnToEndState(@NonNull PipelineState endState) {
// consistency check
if (pipelineState.get().isEndState()) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index d350019b7..8ec2796fe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -40,7 +40,7 @@ public interface Task extends Serializable {
default void close() throws IOException {
}
- default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+ default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 8e395cbe2..bab9db433 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.execution;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.service.slot.SlotContext;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
@@ -31,9 +32,12 @@ public class TaskExecutionContext {
private final Task task;
private final NodeEngineImpl nodeEngine;
- public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+ private final SlotContext slotContext;
+
+ public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine, SlotContext slotContext) {
this.task = task;
this.nodeEngine = nodeEngine;
+ this.slotContext = slotContext;
}
public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -53,4 +57,8 @@ public class TaskExecutionContext {
public <T> T getTask() {
return (T) task;
}
+
+ public SlotContext getSlotContext() {
+ return slotContext;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index fa517e500..536786dec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,7 +29,6 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
-import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
@@ -69,14 +68,14 @@ public class JobMaster implements Runnable {
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
- @NonNull ExecutorService executorService) {
+ @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
flakeIdGenerator =
- this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+ this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
- this.resourceManager = new SimpleResourceManager(this.nodeEngine);
+ this.resourceManager = resourceManager;
}
public void init() throws Exception {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
new file mode 100644
index 000000000..a77068747
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+ private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+ private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+ protected final ConcurrentMap<String, WorkerProfile> registerWorker;
+
+ private final NodeEngine nodeEngine;
+
+ private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+ public AbstractResourceManager(NodeEngine nodeEngine) {
+ this.registerWorker = new ConcurrentHashMap<>();
+ this.nodeEngine = nodeEngine;
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+ CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
+ applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
+ if (error != null) {
+ completableFuture.completeExceptionally(error);
+ } else {
+ completableFuture.complete(profile.get(0));
+ }
+ });
+ return completableFuture;
+ }
+
+ private void waitingWorkerRegister() {
+ if (ExecutionMode.LOCAL.equals(mode)) {
+ // Local mode, should wait worker(master node) register.
+ try {
+ while (registerWorker.isEmpty()) {
+ LOGGER.info("waiting current worker register to resource manager...");
+ Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void memberRemoved(MembershipServiceEvent event) {
+ String nodeID = event.getMember().getAddress().toString();
+ LOGGER.severe("Node heartbeat timeout, disconnected for resource manager. " +
+ "NodeID: " + nodeID);
+ registerWorker.remove(nodeID);
+ }
+
+ @Override
+ public CompletableFuture<List<SlotProfile>> applyResources(long jobId,
+ List<ResourceProfile> resourceProfile) throws NoEnoughResourceException {
+ waitingWorkerRegister();
+ return new ResourceRequestHandler(jobId, resourceProfile, registerWorker, this).request();
+ }
+
+ protected boolean supportDynamicWorker() {
+ return false;
+ }
+
+ /**
+ * find new worker in third party resource manager, it returned after worker register successes.
+ *
+ * @param resourceProfiles the worker should have resource profile list
+ */
+ protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
+ throw new UnsupportedOperationException("Unsupported operation to find new worker in " + this.getClass().getName());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
+ InvocationBuilder invocationBuilder =
+ nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
+ operation, address);
+ return invocationBuilder.invoke();
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (SlotProfile profile : profiles) {
+ futures.add(releaseResource(jobId, profile));
+ }
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
+ if (e != null) {
+ completableFuture.completeExceptionally(e);
+ } else {
+ completableFuture.complete(null);
+ }
+ });
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) {
+ return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+ }
+
+ @Override
+ public void heartbeat(WorkerProfile workerProfile) {
+ if (!registerWorker.containsKey(workerProfile.getWorkerID())) {
+ LOGGER.info("received new worker register: " + workerProfile.getAddress());
+ sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
+ } else {
+ LOGGER.fine("received worker heartbeat from: " + workerProfile.getAddress());
+ }
+ registerWorker.put(workerProfile.getWorkerID(), workerProfile);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
index cb459944c..a4f5a2bff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
@@ -17,14 +17,12 @@
package org.apache.seatunnel.engine.server.resourcemanager;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class NoEnoughResourceException extends RuntimeException {
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+ public NoEnoughResourceException() {
+ }
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public NoEnoughResourceException(String message) {
+ super(message);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index cb459944c..c8fa75c84 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,14 +17,36 @@
package org.apache.seatunnel.engine.server.resourcemanager;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+ void init();
+
+ CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException;
+
+ CompletableFuture<List<SlotProfile>> applyResources(long jobId, List<ResourceProfile> resourceProfile) throws NoEnoughResourceException;
+
+ CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles);
+
+ CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile);
+
+ /**
+ * Every time ResourceManager and Worker communicate, heartbeat method should be called to
+ * record the latest Worker status
+ *
+ * @param workerProfile the worker current worker's profile
+ */
+ void heartbeat(WorkerProfile workerProfile);
+
+ void memberRemoved(MembershipServiceEvent event);
+
+ void close();
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 000000000..82e968f53
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes.KubernetesResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn.YarnResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+public class ResourceManagerFactory {
+
+
+ private final NodeEngine nodeEngine;
+
+ public ResourceManagerFactory(NodeEngine nodeEngine) {
+ this.nodeEngine = nodeEngine;
+ }
+
+ public ResourceManager getResourceManager(DeployType type) {
+ if (DeployType.STANDALONE.equals(type)) {
+ return new StandaloneResourceManager(nodeEngine);
+ } else if (DeployType.KUBERNETES.equals(type)) {
+ return new KubernetesResourceManager(nodeEngine);
+ } else if (DeployType.YARN.equals(type)) {
+ return new YarnResourceManager(nodeEngine);
+ } else {
+ throw new UnsupportedDeployTypeException(type);
+ }
+ }
+
+ public ResourceManager getResourceManager() {
+ return this.getResourceManager(DeployType.STANDALONE);
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
new file mode 100644
index 000000000..a8fb6aeb1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Handle each slot request from resource manager
+ */
+public class ResourceRequestHandler {
+
+ private static final ILogger LOGGER = Logger.getLogger(ResourceRequestHandler.class);
+ private final CompletableFuture<List<SlotProfile>> completableFuture;
+ /*
+ * Cache the slot already request successes, and not request success or not request finished will be null.
+ * The key match with {@link resourceProfile} index. Meaning which value in resultSlotProfiles index is null, the
+ * resourceProfile with same index in resourceProfile haven't requested successes yet.
+ */
+ private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
+ private final ConcurrentMap<String, WorkerProfile> registerWorker;
+
+ private final long jobId;
+
+ private final List<ResourceProfile> resourceProfile;
+
+ private final AbstractResourceManager resourceManager;
+
+ public ResourceRequestHandler(long jobId,
+ List<ResourceProfile> resourceProfile,
+ ConcurrentMap<String, WorkerProfile> registerWorker,
+ AbstractResourceManager resourceManager) {
+ this.completableFuture = new CompletableFuture<>();
+ this.resultSlotProfiles = new ConcurrentHashMap<>();
+ this.jobId = jobId;
+ this.resourceProfile = resourceProfile;
+ this.registerWorker = registerWorker;
+ this.resourceManager = resourceManager;
+ }
+
+ public CompletableFuture<List<SlotProfile>> request() {
+ List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new ArrayList<>();
+ for (int i = 0; i < resourceProfile.size(); i++) {
+ ResourceProfile r = resourceProfile.get(i);
+ Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
+ if (workerProfile.isPresent()) {
+ // request slot to member
+ CompletableFuture<SlotAndWorkerProfile> internalCompletableFuture = singleResourceRequestToMember(i, r, workerProfile.get());
+ allRequestFuture.add(internalCompletableFuture);
+ }
+ }
+ // all resource preCheck done, also had sent request to worker
+ getAllOfFuture(allRequestFuture).whenComplete((unused, error) -> {
+ if (error != null) {
+ completeRequestWithException(error);
+ }
+ if (resultSlotProfiles.size() < resourceProfile.size()) {
+ // meaning have some slot not request success
+ if (resourceManager.supportDynamicWorker()) {
+ applyByDynamicWorker();
+ } else {
+ completeRequestWithException(new NoEnoughResourceException("can't apply resource request: " + resourceProfile.get(findNullIndexInResultSlotProfiles())));
+ }
+ }
+ });
+ return completableFuture;
+ }
+
+ private int findNullIndexInResultSlotProfiles() {
+ for (int i = 0; i < resourceProfile.size(); i++) {
+ if (!resultSlotProfiles.containsKey(i)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void completeRequestWithException(Throwable e) {
+ releaseAllResourceInternal();
+ completableFuture.completeExceptionally(e);
+ }
+
+ private void addSlotToCacheMap(int index, SlotProfile slotProfile) {
+ if (null != slotProfile) {
+ resultSlotProfiles.put(index, slotProfile);
+ if (resultSlotProfiles.size() == resourceProfile.size()) {
+ List<SlotProfile> value = new ArrayList<>();
+ for (int i = 0; i < resultSlotProfiles.size(); i++) {
+ value.add(resultSlotProfiles.get(i));
+ }
+ completableFuture.complete(value);
+ }
+ }
+ }
+
+ private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(int i, ResourceProfile r, WorkerProfile workerProfile) {
+ InvocationFuture<SlotAndWorkerProfile> future = resourceManager.sendToMember(new RequestSlotOperation(jobId, r), workerProfile.getAddress());
+ return future.whenComplete(
+ (slotAndWorkerProfile, error) -> {
+ if (error != null) {
+ throw new RuntimeException(error);
+ } else {
+ resourceManager.heartbeat(slotAndWorkerProfile.getWorkerProfile());
+ addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
+ }
+ }
+ );
+ }
+
+ private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+ // Check if there are still unassigned slots
+ Optional<WorkerProfile> workerProfile =
+ registerWorker.values().stream().filter(worker -> Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot -> slot.getResourceProfile().enoughThan(r))).findAny();
+
+ if (!workerProfile.isPresent()) {
+ // Check if there are still unassigned resources
+ workerProfile =
+ registerWorker.values().stream().filter(worker -> worker.getUnassignedResource().enoughThan(r)).findAny();
+ }
+
+ return workerProfile;
+ }
+
+ /**
+ * When the {@link DeployType} supports dynamic workers and the resources of the current worker
+ * cannot meet the requirements of resource application, we can dynamically request the third-party
+ * resource management to create a new worker, and then complete the resource application
+ */
+ private void applyByDynamicWorker() {
+ List<ResourceProfile> needApplyResource = new ArrayList<>();
+ List<Integer> needApplyIndex = new ArrayList<>();
+ for (int i = 0; i < resultSlotProfiles.size(); i++) {
+ if (!resultSlotProfiles.containsKey(i)) {
+ needApplyResource.add(resourceProfile.get(i));
+ needApplyIndex.add(i);
+ }
+ }
+ resourceManager.findNewWorker(needApplyResource);
+ resourceManager.applyResources(jobId, needApplyResource).whenComplete((s, e) -> {
+ if (e != null) {
+ completeRequestWithException(e);
+ return;
+ }
+ for (int i = 0; i < s.size(); i++) {
+ addSlotToCacheMap(needApplyIndex.get(i), s.get(i));
+ }
+ });
+ }
+
+ private void releaseAllResourceInternal() {
+ LOGGER.warning("apply resource not success, release all already applied resource");
+ resultSlotProfiles.values().stream().filter(Objects::nonNull).forEach(profile -> {
+ resourceManager.releaseResource(jobId, profile);
+ });
+ }
+
+ private <T> CompletableFuture<T> getAllOfFuture(List<CompletableFuture<T>> allRequestFuture) {
+ return (CompletableFuture<T>) CompletableFuture.allOf(allRequestFuture.toArray(new CompletableFuture[0]));
+ }
+
+}
+
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
deleted file mode 100644
index 28bbb81d1..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.resourcemanager;
-
-import org.apache.seatunnel.engine.common.exception.JobException;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import com.hazelcast.spi.impl.NodeEngine;
-import lombok.Data;
-import lombok.NonNull;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Data
-public class SimpleResourceManager implements ResourceManager {
-
- // TODO We may need more detailed resource define, instead of the resource definition method of only Address.
- private Map<Long, Map<TaskGroupLocation, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
-
- private final NodeEngine nodeEngine;
-
- public SimpleResourceManager(NodeEngine nodeEngine) {
- this.nodeEngine = nodeEngine;
- }
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Override
- public Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation) {
- Map<TaskGroupLocation, Address> jobAddressMap =
- physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
-
- Address localhost =
- jobAddressMap.putIfAbsent(taskGroupLocation, nodeEngine.getThisAddress());
- if (null == localhost) {
- localhost = jobAddressMap.get(taskGroupLocation);
- }
-
- return localhost;
-
- }
-
- @Override
- @NonNull
- public Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation) {
- Map<TaskGroupLocation, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
- if (null == longAddressMap || longAddressMap.isEmpty()) {
- throw new JobException(
- String.format("Job %s, Task %s can not found applied resource.", jobId, taskGroupLocation));
- }
-
- return longAddressMap.get(taskGroupLocation);
- }
-}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
index cb459944c..704c9ff68 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
@@ -17,14 +17,11 @@
package org.apache.seatunnel.engine.server.resourcemanager;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class StandaloneResourceManager extends AbstractResourceManager {
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public StandaloneResourceManager(NodeEngine nodeEngine) {
+ super(nodeEngine);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
index cb459944c..a1f091a29 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
@@ -17,14 +17,11 @@
package org.apache.seatunnel.engine.server.resourcemanager;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.common.runtime.DeployType;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class UnsupportedDeployTypeException extends RuntimeException {
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public UnsupportedDeployTypeException(DeployType type) {
+ super("Unknown deploy type: " + (type != null ? type.name() : "null"));
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
index 558593551..c969425a6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +28,49 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class ReleaseSlotOperation extends Operation implements IdentifiedDataSerializable {
- private TaskLocation currentTaskID;
- private TaskLocation committerTaskID;
+ private long jobID;
+ private SlotProfile slotProfile;
- public SinkUnregisterOperation() {
+ public ReleaseSlotOperation() {
}
- public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
- this.currentTaskID = currentTaskID;
- this.committerTaskID = committerTaskID;
+ public ReleaseSlotOperation(long jobID, SlotProfile slotProfile) {
+ this.jobID = jobID;
+ this.slotProfile = slotProfile;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
- task.receivedWriterUnregister(currentTaskID);
+ server.getSlotService().releaseSlot(jobID, slotProfile);
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- currentTaskID.writeData(out);
- committerTaskID.writeData(out);
+ out.writeObject(slotProfile);
+ out.writeLong(jobID);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- currentTaskID.readData(in);
- committerTaskID.readData(in);
+ slotProfile = in.readObject();
+ jobID = in.readLong();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
+ return ResourceDataSerializerHook.FACTORY_ID;
}
@Override
public int getClassId() {
- return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+ return ResourceDataSerializerHook.RELEASE_SLOT_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
index 558593551..56a7971be 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +29,55 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class RequestSlotOperation extends Operation implements IdentifiedDataSerializable {
- private TaskLocation currentTaskID;
- private TaskLocation committerTaskID;
+ private ResourceProfile resourceProfile;
+ private long jobID;
+ private SlotAndWorkerProfile result;
- public SinkUnregisterOperation() {
+ public RequestSlotOperation() {
}
- public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
- this.currentTaskID = currentTaskID;
- this.committerTaskID = committerTaskID;
+ public RequestSlotOperation(long jobID, ResourceProfile resourceProfile) {
+ this.resourceProfile = resourceProfile;
+ this.jobID = jobID;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
- task.receivedWriterUnregister(currentTaskID);
+ result = server.getSlotService().requestSlot(jobID, resourceProfile);
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- currentTaskID.writeData(out);
- committerTaskID.writeData(out);
+ out.writeObject(resourceProfile);
+ out.writeLong(jobID);
+ }
+
+ @Override
+ public Object getResponse() {
+ return result;
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- currentTaskID.readData(in);
- committerTaskID.readData(in);
+ resourceProfile = in.readObject();
+ jobID = in.readLong();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
+ return ResourceDataSerializerHook.FACTORY_ID;
}
@Override
public int getClassId() {
- return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+ return ResourceDataSerializerHook.REQUEST_SLOT_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java
new file mode 100644
index 000000000..d7321419b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class ResetResourceOperation extends Operation implements IdentifiedDataSerializable {
+ public ResetResourceOperation() {
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ server.getSlotService().reset();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.RESET_RESOURCE_TYPE;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
index 558593551..5ea970c47 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +28,45 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class WorkerHeartbeatOperation extends Operation implements IdentifiedDataSerializable {
- private TaskLocation currentTaskID;
- private TaskLocation committerTaskID;
+ private WorkerProfile workerProfile;
- public SinkUnregisterOperation() {
+ public WorkerHeartbeatOperation() {
}
- public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
- this.currentTaskID = currentTaskID;
- this.committerTaskID = committerTaskID;
+ public WorkerHeartbeatOperation(WorkerProfile workerProfile) {
+ this.workerProfile = workerProfile;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
- task.receivedWriterUnregister(currentTaskID);
+ server.getResourceManager().heartbeat(workerProfile);
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- currentTaskID.writeData(out);
- committerTaskID.writeData(out);
+ out.writeObject(workerProfile);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- currentTaskID.readData(in);
- committerTaskID.readData(in);
+ workerProfile = in.readObject();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
+ return ResourceDataSerializerHook.FACTORY_ID;
}
@Override
public int getClassId() {
- return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+ return ResourceDataSerializerHook.WORKER_HEARTBEAT_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
index cb459944c..6fda31285 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
@@ -15,16 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class CPU implements Resource {
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+ private final int core;
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+ private CPU(int core) {
+ this.core = core;
+ }
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public int getCore() {
+ return core;
+ }
+
+ public static CPU of(int core) {
+ return new CPU(core);
+ }
+
+ @Override
+ public String toString() {
+ return "CPU{" +
+ "core=" + core +
+ '}';
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
similarity index 62%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
index cb459944c..f94254995 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
@@ -15,16 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class Memory implements Resource {
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+ private final long bytes;
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+ private Memory(long bytes) {
+ this.bytes = bytes;
+ }
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public long getBytes() {
+ return bytes;
+ }
+
+ public static Memory of(long bytes) {
+ return new Memory(bytes);
+ }
+
+ @Override
+ public String toString() {
+ return "Memory{" +
+ "bytes=" + bytes +
+ '}';
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
index cb459944c..c200e64b7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
@@ -15,16 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import java.io.Serializable;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+/**
+ * The mark of seatunnel worker resource
+ */
+public interface Resource extends Serializable {
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java
new file mode 100644
index 000000000..e687718e3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+ private final CPU cpu;
+
+ private final Memory heapMemory;
+
+ public ResourceProfile() {
+ this.cpu = CPU.of(0);
+ this.heapMemory = Memory.of(0);
+ }
+
+ public ResourceProfile(CPU cpu, Memory heapMemory) {
+ checkArgument(cpu.getCore() >= 0, "The cpu core cannot be negative");
+ checkArgument(heapMemory.getBytes() >= 0, "The heapMemory bytes cannot be negative");
+ this.cpu = cpu;
+ this.heapMemory = heapMemory;
+ }
+
+ public CPU getCpu() {
+ return cpu;
+ }
+
+ public Memory getHeapMemory() {
+ return heapMemory;
+ }
+
+ public ResourceProfile merge(ResourceProfile other) {
+ CPU c = CPU.of(this.cpu.getCore() + other.getCpu().getCore());
+ Memory m = Memory.of(this.heapMemory.getBytes() + other.heapMemory.getBytes());
+ return new ResourceProfile(c, m);
+ }
+
+ public ResourceProfile subtract(ResourceProfile other) {
+ CPU c = CPU.of(this.cpu.getCore() - other.getCpu().getCore());
+ Memory m = Memory.of(this.heapMemory.getBytes() - other.heapMemory.getBytes());
+ return new ResourceProfile(c, m);
+ }
+
+ public boolean enoughThan(ResourceProfile other) {
+ return this.cpu.getCore() >= other.getCpu().getCore()
+ && this.heapMemory.getBytes() >= other.getHeapMemory().getBytes();
+ }
+
+ @Override
+ public String toString() {
+ return "ResourceProfile{" +
+ "cpu=" + cpu +
+ ", heapMemory=" + heapMemory +
+ '}';
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
new file mode 100644
index 000000000..d79fe7151
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import com.hazelcast.cluster.Address;
+
+import java.io.Serializable;
+
+/**
+ * Used to describe the status of the current slot, including resource size and assign status
+ */
+public class SlotProfile implements Serializable {
+
+ private final Address worker;
+
+ private final int slotID;
+
+ private long ownerJobID;
+
+ private volatile boolean assigned;
+
+ private final ResourceProfile resourceProfile;
+
+ public SlotProfile(Address worker, int slotID, ResourceProfile resourceProfile) {
+ this.worker = worker;
+ this.slotID = slotID;
+ this.resourceProfile = resourceProfile;
+ }
+
+ public Address getWorker() {
+ return worker;
+ }
+
+ public int getSlotID() {
+ return slotID;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ public long getOwnerJobID() {
+ return ownerJobID;
+ }
+
+ public void assign(long jobID) {
+ if (assigned) {
+ throw new UnsupportedOperationException();
+ } else {
+ ownerJobID = jobID;
+ assigned = true;
+ }
+ }
+
+ public void unassigned() {
+ assigned = false;
+ }
+
+ @Override
+ public String toString() {
+ return "SlotProfile{" +
+ "worker=" + worker +
+ ", slotID=" + slotID +
+ ", ownerJobID=" + ownerJobID +
+ ", assigned=" + assigned +
+ ", resourceProfile=" + resourceProfile +
+ '}';
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
index cb459944c..9181b9b9f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class CreateWorkerResult {
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+ private String message;
+
+ private WorkerProfile workerProfile;
+
+ private Throwable error;
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
index cb459944c..914562725 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+import java.util.concurrent.CompletableFuture;
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+public interface ThirdPartyResourceManager {
+
+ CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile);
+
+ CompletableFuture<Void> releaseWorker(String workerID);
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
new file mode 100644
index 000000000..bb9e6610d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes;
+
+import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.util.concurrent.CompletableFuture;
+
+public class KubernetesResourceManager extends AbstractResourceManager implements ThirdPartyResourceManager {
+
+ public KubernetesResourceManager(NodeEngine nodeEngine) {
+ super(nodeEngine);
+ }
+
+ @Override
+ public CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseWorker(String workerID) {
+ return null;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
new file mode 100644
index 000000000..1c9abdf04
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn;
+
+import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.util.concurrent.CompletableFuture;
+
+public class YarnResourceManager extends AbstractResourceManager implements ThirdPartyResourceManager {
+ public YarnResourceManager(NodeEngine nodeEngine) {
+ super(nodeEngine);
+ }
+
+ @Override
+ public CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseWorker(String workerID) {
+ return null;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
new file mode 100644
index 000000000..329fd9dd1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.worker;
+
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * Used to describe the status of the current Worker, including address and resource assign status
+ */
+@Data
+public class WorkerProfile implements Serializable {
+
+ private final String workerID;
+
+ private final Address address;
+
+ private ResourceProfile profile;
+
+ private ResourceProfile unassignedResource;
+
+ private SlotProfile[] assignedSlots;
+
+ private SlotProfile[] unassignedSlots;
+
+ public WorkerProfile(String workerID, Address address) {
+ this.workerID = workerID;
+ this.address = address;
+ this.unassignedResource = new ResourceProfile();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index bb99792ff..6c561a709 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -26,19 +26,30 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import com.google.common.collect.Lists;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
public class PipelineBaseScheduler implements JobScheduler {
private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
private final PhysicalPlan physicalPlan;
+
+ private final long jobId;
private final JobMaster jobMaster;
private final ResourceManager resourceManager;
@@ -46,6 +57,7 @@ public class PipelineBaseScheduler implements JobScheduler {
this.physicalPlan = physicalPlan;
this.jobMaster = jobMaster;
this.resourceManager = jobMaster.getResourceManager();
+ this.jobId = physicalPlan.getJobImmutableInformation().getJobId();
}
@Override
@@ -56,18 +68,23 @@ public class PipelineBaseScheduler implements JobScheduler {
handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
return null;
}
- if (!applyResourceForPipeline(pipeline)) {
- return null;
+ Map<PhysicalVertex, SlotProfile> slotProfiles;
+ try {
+ slotProfiles = applyResourceForPipeline(pipeline);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ pipeline.whenComplete((state, error) -> releasePipelineResource(Lists.newArrayList(slotProfiles.values())));
// deploy pipeline
return CompletableFuture.supplyAsync(() -> {
- deployPipeline(pipeline);
+ // TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
+ deployPipeline(pipeline, slotProfiles);
return null;
});
- }).filter(x -> x != null).collect(Collectors.toList());
+ }).filter(Objects::nonNull).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
- collect.toArray(new CompletableFuture[collect.size()]));
+ collect.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
} catch (Exception e) {
// cancel pipeline and throw an exception
@@ -80,64 +97,73 @@ public class PipelineBaseScheduler implements JobScheduler {
}
}
- private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
- try {
- // apply resource for coordinators
- subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
-
- // apply resource for other tasks
- subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
- } catch (JobNoEnoughResourceException e) {
- LOGGER.severe(e);
- return false;
+ private void releasePipelineResource(List<SlotProfile> slotProfiles) {
+ if (null == slotProfiles || slotProfiles.isEmpty()) {
+ return;
}
-
- return true;
+ resourceManager.releaseResources(jobId, slotProfiles).join();
}
- private void applyResourceForTask(PhysicalVertex task) {
+ private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
try {
- // TODO If there is no enough resources for tasks, we need add some wait profile
- if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
- resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getTaskGroupLocation());
- } else {
- handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+ Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+ Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+ subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+ coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+ });
+
+ subPlan.getPhysicalVertexList().forEach(task -> {
+ // TODO If there is no enough resources for tasks, we need add some wait profile
+ if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+ // TODO custom resource size
+ futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
+ } else {
+ handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+ }
+ });
+ for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+ try {
+ slotProfiles.put(future.getKey(), future.getValue().get());
+ } catch (NoEnoughResourceException e) {
+ // TODO custom exception with pipelineID, jobName etc.
+ throw new JobNoEnoughResourceException("No enough resource to execute pipeline", e);
+ }
}
- } catch (JobNoEnoughResourceException e) {
+ return slotProfiles;
+ } catch (JobNoEnoughResourceException | ExecutionException | InterruptedException e) {
LOGGER.severe(e);
+ throw e;
}
}
- private CompletableFuture<Void> deployTask(PhysicalVertex task) {
+ private CompletableFuture<Void> deployTask(PhysicalVertex task, Supplier<Void> deployMethod) {
if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
// deploy is a time-consuming operation, so we do it async
- return CompletableFuture.supplyAsync(() -> {
- task.deploy(
- resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getTaskGroupLocation()));
- return null;
- });
+ return CompletableFuture.supplyAsync(deployMethod);
} else {
handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
}
return null;
}
- private void deployPipeline(@NonNull SubPlan pipeline) {
+ private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
- List<CompletableFuture> deployCoordinatorFuture =
- pipeline.getCoordinatorVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
- .collect(Collectors.toList());
+ List<CompletableFuture<?>> deployCoordinatorFuture =
+ pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, () -> {
+ coordinator.deployOnMaster();
+ return null;
+ })).filter(Objects::nonNull).collect(Collectors.toList());
- List<CompletableFuture> deployTaskFuture =
- pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
- .collect(Collectors.toList());
+ List<CompletableFuture<?>> deployTaskFuture =
+ pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, () -> {
+ task.deploy(slotProfiles.get(task));
+ return null;
+ })).filter(Objects::nonNull).collect(Collectors.toList());
try {
deployCoordinatorFuture.addAll(deployTaskFuture);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
- deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]));
+ deployCoordinatorFuture.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
LOGGER.info(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
new file mode 100644
index 000000000..dfa2d753b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class ResourceDataSerializerHook implements DataSerializerHook {
+
+
+ public static final int WORKER_HEARTBEAT_TYPE = 1;
+
+ public static final int REQUEST_SLOT_TYPE = 2;
+
+ public static final int RELEASE_SLOT_TYPE = 3;
+
+ public static final int RESET_RESOURCE_TYPE = 4;
+
+ public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+ SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
+ );
+
+ @Override
+ public int getFactoryId() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public DataSerializableFactory createFactory() {
+ return new Factory();
+ }
+
+ private static class Factory implements DataSerializableFactory {
+
+ @Override
+ public IdentifiedDataSerializable create(int typeId) {
+ switch (typeId) {
+ case WORKER_HEARTBEAT_TYPE:
+ return new WorkerHeartbeatOperation();
+ case REQUEST_SLOT_TYPE:
+ return new RequestSlotOperation();
+ case RELEASE_SLOT_TYPE:
+ return new ReleaseSlotOperation();
+ case RESET_RESOURCE_TYPE:
+ return new ResetResourceOperation();
+ default:
+ throw new IllegalArgumentException("Unknown type id " + typeId);
+ }
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
new file mode 100644
index 000000000..5c60d25cf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.service.slot;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The slot service of seatunnel server, used for manage slot in worker.
+ */
+public class DefaultSlotService implements SlotService {
+
+ private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
+ private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
+ private static final int HEARTBEAT_RETRY_TIME = 5;
+ private final NodeEngineImpl nodeEngine;
+
+ private AtomicReference<ResourceProfile> unassignedResource;
+
+ private AtomicReference<ResourceProfile> assignedResource;
+
+ private ConcurrentMap<Integer, SlotProfile> assignedSlots;
+
+ private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
+ private ScheduledExecutorService scheduledExecutorService;
+ private final String serviceID;
+ private final boolean dynamicSlot;
+ private final int slotNumber;
+ private volatile boolean initStatus;
+ private final IdGenerator idGenerator;
+ private final TaskExecutionService taskExecutionService;
+ private ConcurrentMap<Integer, SlotContext> contexts;
+
+ public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+ this.nodeEngine = nodeEngine;
+ this.dynamicSlot = dynamicSlot;
+ this.taskExecutionService = taskExecutionService;
+ this.slotNumber = slotNumber;
+ this.serviceID = nodeEngine.getThisAddress().toString();
+ this.idGenerator = new IdGenerator();
+ }
+
+ @Override
+ public void init() {
+ initStatus = true;
+ contexts = new ConcurrentHashMap<>();
+ assignedSlots = new ConcurrentHashMap<>();
+ unassignedSlots = new ConcurrentHashMap<>();
+ unassignedResource = new AtomicReference<>(new ResourceProfile());
+ assignedResource = new AtomicReference<>(new ResourceProfile());
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
+ String.format("hz.%s.seaTunnel.slotService.thread", nodeEngine.getHazelcastInstance().getName())));
+ if (!dynamicSlot) {
+ initFixedSlots();
+ }
+ unassignedResource.set(getNodeResource());
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ RetryUtils.retryWithException(() -> {
+ LOGGER.fine("start send heartbeat to resource manager, this address: " + nodeEngine.getClusterService().getThisAddress());
+ sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
+ return null;
+ }, new RetryUtils.RetryMaterial(HEARTBEAT_RETRY_TIME, true, e -> true, DEFAULT_HEARTBEAT_TIMEOUT));
+ } catch (Exception e) {
+ LOGGER.severe(e);
+ LOGGER.severe("failed send heartbeat to resource manager, will retry later. this address: " + nodeEngine.getClusterService().getThisAddress());
+ }
+ }, 0, DEFAULT_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void reset() {
+ if (!initStatus) {
+ synchronized (this) {
+ if (!initStatus) {
+ this.close();
+ init();
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized SlotAndWorkerProfile requestSlot(long jobId, ResourceProfile resourceProfile) {
+ initStatus = false;
+ LOGGER.info(String.format("received slot request, jobID: %d, resource profile: %s", jobId, resourceProfile));
+ SlotProfile profile = selectBestMatchSlot(resourceProfile);
+ if (profile != null) {
+ profile.assign(jobId);
+ assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
+ unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
+ unassignedSlots.remove(profile.getSlotID());
+ assignedSlots.put(profile.getSlotID(), profile);
+ contexts.computeIfAbsent(profile.getSlotID(), p -> new SlotContext(profile.getSlotID(), taskExecutionService));
+ }
+ return new SlotAndWorkerProfile(toWorkerProfile(), profile);
+ }
+
+ public SlotContext getSlotContext(int slotID) {
+ return contexts.get(slotID);
+ }
+
+ @Override
+ public void releaseSlot(long jobId, SlotProfile profile) {
+ LOGGER.info(String.format("received slot release request, jobID: %d, slot: %s", jobId, profile));
+ if (!assignedSlots.containsKey(profile.getSlotID())) {
+ throw new WrongTargetSlotException("Not exist this slot in slot service, slot profile: " + profile);
+ }
+
+ if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
+ throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d",
+ assignedSlots.get(profile.getSlotID()), jobId));
+ }
+
+ assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
+ unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
+ profile.unassigned();
+ if (!dynamicSlot) {
+ unassignedSlots.put(profile.getSlotID(), profile);
+ }
+ assignedSlots.remove(profile.getSlotID());
+ contexts.remove(profile.getSlotID());
+ }
+
+ @Override
+ public void close() {
+ scheduledExecutorService.shutdown();
+ }
+
+ private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
+ if (unassignedSlots.isEmpty() && !dynamicSlot) {
+ return null;
+ }
+ if (dynamicSlot) {
+ if (unassignedResource.get().enoughThan(profile)) {
+ return new SlotProfile(nodeEngine.getThisAddress(), (int) idGenerator.getNextId(), profile);
+ }
+ } else {
+ Optional<SlotProfile> result = unassignedSlots.values().stream()
+ .filter(slot -> slot.getResourceProfile().enoughThan(profile))
+ .min((slot1, slot2) -> {
+ if (slot1.getResourceProfile().getHeapMemory().getBytes() != slot2.getResourceProfile().getHeapMemory().getBytes()) {
+ return slot1.getResourceProfile().getHeapMemory().getBytes() - slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
+ } else {
+ return slot1.getResourceProfile().getCpu().getCore() - slot2.getResourceProfile().getCpu().getCore();
+ }
+ });
+ return result.orElse(null);
+ }
+ return null;
+ }
+
+ private void initFixedSlots() {
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ for (int i = 0; i < slotNumber; i++) {
+ unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
+ new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+ }
+ }
+
+ public WorkerProfile toWorkerProfile() {
+ WorkerProfile workerProfile = new WorkerProfile(serviceID, nodeEngine.getThisAddress());
+ workerProfile.setProfile(getNodeResource());
+ workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
+ workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
+ workerProfile.setUnassignedResource(unassignedResource.get());
+ return workerProfile;
+ }
+
+ private ResourceProfile getNodeResource() {
+ return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
+ }
+
+ public <E> InvocationFuture<E> sendToMaster(Operation operation) {
+ InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+ return invocationBuilder.invoke();
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index d350019b7..2523e79d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -15,32 +15,32 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.service.slot;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
-import java.io.IOException;
import java.io.Serializable;
-public interface Task extends Serializable {
+public class SlotAndWorkerProfile implements Serializable {
- default void init() throws Exception {
- }
-
- @NonNull
- ProgressState call() throws Exception;
+ private final WorkerProfile workerProfile;
- @NonNull
- Long getTaskID();
+ private final SlotProfile slotProfile;
- default boolean isThreadsShare() {
- return false;
+ public SlotAndWorkerProfile(WorkerProfile workerProfile, SlotProfile slotProfile) {
+ this.workerProfile = workerProfile;
+ this.slotProfile = slotProfile;
}
- default void close() throws IOException {
+ public WorkerProfile getWorkerProfile() {
+ return workerProfile;
}
- default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+ /**
+ * Get slot profile of worker return. Could be null if no slot can be provided.
+ */
+ public SlotProfile getSlotProfile() {
+ return slotProfile;
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
index d350019b7..d975de191 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
@@ -15,32 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.service.slot;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
-import java.io.IOException;
-import java.io.Serializable;
+public class SlotContext {
+ private final TaskExecutionService taskExecutionService;
+ private final int slotID;
-public interface Task extends Serializable {
-
- default void init() throws Exception {
- }
-
- @NonNull
- ProgressState call() throws Exception;
-
- @NonNull
- Long getTaskID();
-
- default boolean isThreadsShare() {
- return false;
+ public SlotContext(int slotID, TaskExecutionService taskExecutionService) {
+ this.slotID = slotID;
+ this.taskExecutionService = taskExecutionService;
+ this.taskExecutionService.setSlotContext(this);
}
- default void close() throws IOException {
+ public int getSlotID() {
+ return slotID;
}
- default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+ public TaskExecutionService getTaskExecutionService() {
+ return taskExecutionService;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index cb459944c..5d5a5ab14 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.service.slot;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public interface SlotService {
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+ void init();
+
+ void reset();
+
+ SlotAndWorkerProfile requestSlot(long jobID, ResourceProfile resourceProfile);
+
+ SlotContext getSlotContext(int slotID);
+
+ void releaseSlot(long jobId, SlotProfile slotProfile);
+
+ void close();
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
index cb459944c..16a3dae43 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.service.slot;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class WrongTargetSlotException extends RuntimeException {
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+ public WrongTargetSlotException() {
+ }
-public interface ResourceManager {
- Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
- @NonNull
- Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+ public WrongTargetSlotException(String message) {
+ super(message);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 4f7c64afc..4797ea914 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -57,7 +57,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private Map<TaskLocation, Address> taskMemberMapping;
private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
- private EnumeratorState currState;
+ private volatile EnumeratorState currState;
private CompletableFuture<Void> readerRegisterFuture;
private CompletableFuture<Void> readerFinishFuture;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 1667caf4d..155307a6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.operation.AsyncOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
@@ -32,18 +33,21 @@ import java.io.IOException;
public class DeployTaskOperation extends AsyncOperation {
private Data taskImmutableInformation;
+ private SlotProfile slotProfile;
public DeployTaskOperation() {
}
- public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
+ public DeployTaskOperation(@NonNull SlotProfile slotProfile, @NonNull Data taskImmutableInformation) {
this.taskImmutableInformation = taskImmutableInformation;
+ this.slotProfile = slotProfile;
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer server = getService();
- return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+ return server.getSlotService().getSlotContext(slotProfile.getSlotID())
+ .getTaskExecutionService().deployTask(taskImmutableInformation);
}
@Override
@@ -55,11 +59,13 @@ public class DeployTaskOperation extends AsyncOperation {
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
IOUtil.writeData(out, taskImmutableInformation);
+ out.writeObject(slotProfile);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
taskImmutableInformation = IOUtil.readData(in);
+ slotProfile = in.readObject();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index ccbc8d519..ccc2d0bce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -36,7 +36,7 @@ import java.io.IOException;
public class SinkRegisterOperation extends Operation implements IdentifiedDataSerializable {
private static final ILogger LOGGER = Logger.getLogger(SinkRegisterOperation.class);
- private static final int RETRY_TIME = 5;
+ private static final int RETRY_NUMBER = 5;
private static final int RETRY_INTERVAL = 2000;
private TaskLocation writerTaskID;
private TaskLocation committerTaskID;
@@ -54,10 +54,9 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
SeaTunnelServer server = getService();
Address readerAddress = getCallerAddress();
SinkAggregatedCommitterTask<?> task = null;
- for (int i = 0; i < RETRY_TIME; i++) {
+ for (int i = 0; i < RETRY_NUMBER; i++) {
try {
- task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation())
- .getTaskGroup().getTask(committerTaskID.getTaskID());
+ task = server.getTaskExecutionService().getTask(committerTaskID);
break;
} catch (NullPointerException e) {
LOGGER.warning("can't get committer task , waiting task started");
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
index 558593551..84d523b80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
@@ -46,7 +46,7 @@ public class SinkUnregisterOperation extends Operation implements IdentifiedData
public void run() throws Exception {
SeaTunnelServer server = getService();
SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
+ server.getTaskExecutionService().getTask(committerTaskID);
task.receivedWriterUnregister(currentTaskID);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index ddbe9ae59..a2ccdc4ba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -50,9 +50,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
public void run() throws Exception {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
- SourceSeaTunnelTask<?, SplitT> task =
- server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getTaskGroup()
- .getTask(taskID.getTaskID());
+ SourceSeaTunnelTask<?, SplitT> task = server.getTaskExecutionService().getTask(taskID);
task.receivedSourceSplit(splits);
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index f25b895b3..7533f75e6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -46,9 +46,7 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
public void run() throws Exception {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
- SourceSeaTunnelTask<?, ?> task =
- server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupLocation())
- .getTaskGroup().getTask(readerLocation.getTaskID());
+ SourceSeaTunnelTask<?, ?> task = server.getTaskExecutionService().getTask(readerLocation);
task.close();
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index d214368a1..ba853833f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -50,9 +50,7 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
- SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
- .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+ SourceSplitEnumeratorTask<?> task = server.getTaskExecutionService().getTask(enumeratorTaskID);
task.requestSplit(taskID.getTaskID());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 58b09146e..0cba1c1f7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,8 +49,7 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
- .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+ server.getTaskExecutionService().getTask(enumeratorTaskID);
task.readerFinished(currentTaskID.getTaskID());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index a2f08bf70..f3c88c25b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -57,8 +57,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
Address readerAddress = getCallerAddress();
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation()).getTaskGroup()
- .getTask(enumeratorTaskID.getTaskID());
+ server.getTaskExecutionService().getTask(enumeratorTaskID);
task.receivedReader(readerTaskID, readerAddress);
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index bf045cd23..ced85edfa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -16,4 +16,5 @@
#
org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
-org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
\ No newline at end of file
+org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
+org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 000000000..89274d2a6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
+
+ private ResourceManager resourceManager;
+
+ private final long jobId = 5;
+
+ @Before
+ public void before() {
+ super.before();
+ resourceManager = server.getResourceManager();
+ server.getSlotService();
+ }
+
+ @Test
+ public void testApplyRequest() throws ExecutionException, InterruptedException {
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
+ resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
+ resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
+ List<SlotProfile> slotProfiles = resourceManager.applyResources(jobId, resourceProfiles).get();
+
+ Assert.assertEquals(resourceProfiles.get(0).getHeapMemory().getBytes(), slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
+ Assert.assertEquals(resourceProfiles.get(1).getHeapMemory().getBytes(), slotProfiles.get(1).getResourceProfile().getHeapMemory().getBytes());
+ Assert.assertEquals(resourceProfiles.get(2).getHeapMemory().getBytes(), slotProfiles.get(2).getResourceProfile().getHeapMemory().getBytes());
+
+ Assert.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId + 1, slotProfiles).get());
+
+ resourceManager.releaseResources(jobId, slotProfiles).get();
+
+ Assert.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId, slotProfiles).get());
+
+ Assert.assertThrows(ExecutionException.class, () -> resourceManager.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
+ }
+
+}