You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/01 09:56:01 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][ResourceManager] Add ResourceManager (#2472)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 176e4ac75 [Engine][ResourceManager] Add ResourceManager (#2472)
176e4ac75 is described below

commit 176e4ac753ce9803f1c634593a6f7bb827ce775a
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Sep 1 17:55:56 2022 +0800

    [Engine][ResourceManager] Add ResourceManager (#2472)
    
    * [Engine][ResourceManager] add resource manager interface
    
    * [Engine][ResourceManager] add resource manager interface
    
    * [Engine][ResourceManager] Add SlotService
    
    * [Engine][ResourceManager] Add seatunnel resource manager and slot service
    
    * [Engine][Task] Add deploy on master
    
    * [Engine][ResourceManager] Add resource manager and slot service
    
    * [Engine][ResourceManager] Add resource manager and slot service
    
    * [Engine][ResourceManager] Add volatile for state
    
    * [Engine][ResourceManager] Add lazy load from ResourceManager
    
    * [Engine][ResourceManager] Fix bugs
    
    * [Engine][ResourceManager] Fix merge problems
    
    * [Engine][ResourceManager] Fix checkstyle
    
    * [Engine][ResourceManager] add UT and improve encapsulation of the slot
    
    * [Engine][ResourceManager] fix UT error
    
    * [Engine][ResourceManager] change WorkerTaskLocation to TaskLocation
    
    * [Engine][ResourceManager] fix close bug
    
    * [Engine][ResourceManager] fix review problems
    
    * [Engine][ResourceManager] fix review problems
    
    * [Engine][ResourceManager] remove HeartbeatManager and use hazelcast member service
    
    * [Engine][ResourceManager] Add reset from SlotService
---
 .../engine/common/runtime/DeployType.java}         |  16 +-
 .../engine/common/runtime/ExecutionMode.java}      |  15 +-
 .../serializeable/SeaTunnelFactoryIdConstant.java  |   4 +
 .../seatunnel/engine/server/SeaTunnelServer.java   |  54 ++++-
 .../engine/server/TaskExecutionService.java        |  31 ++-
 .../engine/server/dag/physical/PhysicalPlan.java   |   2 +-
 .../engine/server/dag/physical/PhysicalVertex.java |  79 +++++---
 .../engine/server/dag/physical/SubPlan.java        |  11 +-
 .../seatunnel/engine/server/execution/Task.java    |   2 +-
 .../server/execution/TaskExecutionContext.java     |  10 +-
 .../seatunnel/engine/server/master/JobMaster.java  |   7 +-
 .../resourcemanager/AbstractResourceManager.java   | 162 +++++++++++++++
 ...Manager.java => NoEnoughResourceException.java} |  14 +-
 .../server/resourcemanager/ResourceManager.java    |  34 +++-
 .../resourcemanager/ResourceManagerFactory.java    |  51 +++++
 .../resourcemanager/ResourceRequestHandler.java    | 194 ++++++++++++++++++
 .../resourcemanager/SimpleResourceManager.java     |  70 -------
 ...Manager.java => StandaloneResourceManager.java} |  13 +-
 ...er.java => UnsupportedDeployTypeException.java} |  13 +-
 .../opeartion/ReleaseSlotOperation.java}           |  42 ++--
 .../opeartion/RequestSlotOperation.java}           |  49 +++--
 .../opeartion/ResetResourceOperation.java          |  50 +++++
 .../opeartion/WorkerHeartbeatOperation.java}       |  38 ++--
 .../{ResourceManager.java => resource/CPU.java}    |  28 ++-
 .../{ResourceManager.java => resource/Memory.java} |  28 ++-
 .../Resource.java}                                 |  16 +-
 .../resourcemanager/resource/ResourceProfile.java  |  74 +++++++
 .../resourcemanager/resource/SlotProfile.java      |  84 ++++++++
 .../CreateWorkerResult.java}                       |  16 +-
 .../ThirdPartyResourceManager.java}                |  16 +-
 .../kubernetes/KubernetesResourceManager.java      |  44 +++++
 .../thirdparty/yarn/YarnResourceManager.java       |  43 ++++
 .../resourcemanager/worker/WorkerProfile.java      |  51 +++++
 .../server/scheduler/PipelineBaseScheduler.java    | 106 ++++++----
 .../serializable/ResourceDataSerializerHook.java   |  75 +++++++
 .../server/service/slot/DefaultSlotService.java    | 217 +++++++++++++++++++++
 .../slot/SlotAndWorkerProfile.java}                |  32 +--
 .../Task.java => service/slot/SlotContext.java}    |  32 ++-
 .../slot/SlotService.java}                         |  23 ++-
 .../slot/WrongTargetSlotException.java}            |  16 +-
 .../server/task/SourceSplitEnumeratorTask.java     |   2 +-
 .../server/task/operation/DeployTaskOperation.java |  10 +-
 .../task/operation/sink/SinkRegisterOperation.java |   7 +-
 .../operation/sink/SinkUnregisterOperation.java    |   2 +-
 .../operation/source/AssignSplitOperation.java     |   4 +-
 .../operation/source/CloseRequestOperation.java    |   4 +-
 .../operation/source/RequestSplitOperation.java    |   4 +-
 .../source/SourceNoMoreElementOperation.java       |   3 +-
 .../operation/source/SourceRegisterOperation.java  |   3 +-
 .../services/com.hazelcast.DataSerializerHook      |   3 +-
 .../resourcemanager/ResourceManagerTest.java       |  68 +++++++
 51 files changed, 1582 insertions(+), 390 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
index cb459944c..7c61c126c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/DeployType.java
@@ -15,16 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.common.runtime;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+public enum DeployType {
+    STANDALONE,
+    YARN,
+    KUBERNETES
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
index cb459944c..061d0c1c8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/runtime/ExecutionMode.java
@@ -15,16 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.common.runtime;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+public enum ExecutionMode {
+    LOCAL,
+    CLUSTER
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index 0f2470495..f65c95e62 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -43,4 +43,8 @@ public final class SeaTunnelFactoryIdConstant {
     public static final String SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY =
             "hazelcast.serialization.ds.seatunnel.engine.task";
     public static final int SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID = -30004;
+
+    public static final String SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY =
+            "hazelcast.serialization.ds.seatunnel.engine.resource";
+    public static final int SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID = -30005;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 08560c7db..7c912f304 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -22,6 +22,10 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
+import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
+import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.instance.impl.Node;
@@ -53,9 +57,11 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     private final ILogger logger;
     private final LiveOperationRegistry liveOperationRegistry;
 
+    private volatile SlotService slotService;
     private TaskExecutionService taskExecutionService;
 
     private final ExecutorService executorService;
+    private volatile ResourceManager resourceManager;
 
     private final SeaTunnelConfig seaTunnelConfig;
 
@@ -75,8 +81,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         logger.info("SeaTunnel server start...");
     }
 
-    public TaskExecutionService getTaskExecutionService() {
-        return this.taskExecutionService;
+    /**
+     * Lazy load for Slot Service
+     */
+    public SlotService getSlotService() {
+        if (slotService == null) {
+            synchronized (this) {
+                if (slotService == null) {
+                    SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, true, 2);
+                    service.init();
+                    slotService = service;
+                }
+            }
+        }
+        return slotService;
     }
 
     public JobMaster getJobMaster(Long jobId) {
@@ -86,10 +104,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     @Override
     public void init(NodeEngine engine, Properties hzProperties) {
         this.nodeEngine = (NodeEngineImpl) engine;
+        // TODO Determine whether to execute there method on the master node according to the deploy type
         taskExecutionService = new TaskExecutionService(
             nodeEngine, nodeEngine.getProperties()
         );
         taskExecutionService.start();
+        getSlotService();
     }
 
     @Override
@@ -99,6 +119,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     @Override
     public void shutdown(boolean terminate) {
+        if (slotService != null) {
+            slotService.close();
+        }
+        if (resourceManager != null) {
+            resourceManager.close();
+        }
         taskExecutionService.shutdown();
     }
 
@@ -109,7 +135,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
-
+        resourceManager.memberRemoved(event);
     }
 
     @Override
@@ -129,12 +155,32 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return liveOperationRegistry;
     }
 
+    /**
+     * Lazy load for resource manager
+     */
+    public ResourceManager getResourceManager() {
+        if (resourceManager == null) {
+            synchronized (this) {
+                if (resourceManager == null) {
+                    ResourceManager manager = new ResourceManagerFactory(nodeEngine).getResourceManager();
+                    manager.init();
+                    resourceManager = manager;
+                }
+            }
+        }
+        return resourceManager;
+    }
+
+    public TaskExecutionService getTaskExecutionService() {
+        return taskExecutionService;
+    }
+
     /**
      * call by client to submit job
      */
     public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
         CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
-        JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
+        JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService, getResourceManager());
         executorService.submit(() -> {
             try {
                 jobMaster.init();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 03ea4d53e..82a6bbf52 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -36,8 +36,10 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.service.slot.SlotContext;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
 import com.google.common.collect.Lists;
@@ -83,6 +85,7 @@ public class TaskExecutionService {
     // key: TaskID
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
     private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+    private SlotContext slotContext;
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -99,6 +102,10 @@ public class TaskExecutionService {
         executorService.shutdownNow();
     }
 
+    public void setSlotContext(SlotContext slotContext) {
+        this.slotContext = slotContext;
+    }
+
     public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
         return executionContexts.get(taskGroupLocation);
     }
@@ -125,14 +132,21 @@ public class TaskExecutionService {
         uncheckRun(startedLatch::await);
     }
 
+    public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
+        TaskGroupImmutableInformation taskImmutableInfo =
+            nodeEngine.getSerializationService().toObject(taskImmutableInformation);
+        return deployTask(taskImmutableInfo);
+    }
+
+    public <T extends Task> T getTask(TaskLocation taskLocation) {
+        return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup().getTask(taskLocation.getTaskID());
+    }
+
     public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
-        @NonNull Data taskImmutableInformation
-    ) {
+        @NonNull TaskGroupImmutableInformation taskImmutableInfo) {
         CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
         TaskGroup taskGroup = null;
         try {
-            TaskGroupImmutableInformation taskImmutableInfo =
-                nodeEngine.getSerializationService().toObject(taskImmutableInformation);
             Set<URL> jars = taskImmutableInfo.getJars();
 
             if (!CollectionUtils.isEmpty(jars)) {
@@ -171,10 +185,11 @@ public class TaskExecutionService {
             ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
             final Map<Boolean, List<Task>> byCooperation =
                 tasks.stream()
-                    .peek(x -> {
-                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine);
-                        x.setTaskExecutionContext(taskExecutionContext);
-                        taskExecutionContextMap.put(x.getTaskID(), taskExecutionContext);
+                    .peek(task -> {
+                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(task, nodeEngine,
+                                slotContext);
+                        task.setTaskExecutionContext(taskExecutionContext);
+                        taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
                     })
                     .collect(partitioningBy(Task::isThreadsShare));
             submitThreadShareTask(executionTracker, byCooperation.get(true));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 5a1daecf4..6fd0fe132 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -88,7 +88,7 @@ public class PhysicalPlan {
     }
 
     public void initStateFuture() {
-        pipelineList.stream().forEach(subPlan -> {
+        pipelineList.forEach(subPlan -> {
             PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
             future.whenComplete((v, t) -> {
                 // We need not handle t, Because we will not return t from Pipeline
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d091fa330..08ab71c15 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -22,11 +22,13 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
@@ -43,6 +45,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 
 /**
  * PhysicalVertex is responsible for the scheduling and execution of a single task parallel
@@ -94,11 +97,6 @@ public class PhysicalVertex {
      */
     private final long[] stateTimestamps;
 
-    /**
-     * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
-     */
-    private PassiveCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
-
     private final JobImmutableInformation jobImmutableInformation;
 
     private final long initializationTimestamp;
@@ -107,8 +105,6 @@ public class PhysicalVertex {
 
     private Address currentExecutionAddress;
 
-    private TaskGroupImmutableInformation taskGroupImmutableInformation;
-
     public PhysicalVertex(long physicalVertexId,
                           int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
@@ -155,23 +151,33 @@ public class PhysicalVertex {
         return new PassiveCompletableFuture<>(this.taskFuture);
     }
 
+    public void deployOnMaster() {
+        currentExecutionAddress = nodeEngine.getMasterAddress();
+        deployInternal(taskGroupImmutableInformation -> {
+            SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+            return new PassiveCompletableFuture<>(server.getTaskExecutionService()
+                .deployTask(taskGroupImmutableInformation));
+        });
+    }
+
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
-    public void deploy(@NonNull Address address) {
-        currentExecutionAddress = address;
-        taskGroupImmutableInformation = new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
-            nodeEngine.getSerializationService().toData(this.taskGroup),
-            this.pluginJarsUrls);
+    public void deploy(@NonNull SlotProfile slotProfile) {
+        currentExecutionAddress = slotProfile.getWorker();
+        deployInternal(taskGroupImmutableInformation -> new PassiveCompletableFuture<>(
+            nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                    new DeployTaskOperation(slotProfile,
+                        nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                    slotProfile.getWorker())
+                .invoke()));
+    }
 
+    private void deployInternal(Function<TaskGroupImmutableInformation, PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+        TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
+        PassiveCompletableFuture<TaskExecutionState> completeFuture;
         try {
             if (ExecutionState.DEPLOYING.equals(executionState.get())) {
-                waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
-                    nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                            new DeployTaskOperation(
-                                nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                            currentExecutionAddress)
-                        .invoke());
-
+                completeFuture = deployMethod.apply(taskGroupImmutableInformation);
                 // may be canceling
                 if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                     // If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
@@ -188,29 +194,42 @@ public class PhysicalVertex {
                                 this.getTaskFullName(), executionState.get()))));
                     }
                 }
+                monitorTask(completeFuture);
             } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
                 turnToEndState(ExecutionState.CANCELED);
                 taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
             } else {
                 turnToEndState(ExecutionState.FAILED);
                 taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
-                    new JobException(String.format("%s turn to a unexpected state"))));
+                    new JobException(String.format("%s turn to a unexpected state", getTaskFullName()))));
             }
 
         } catch (Throwable th) {
-            LOGGER.severe(String.format("%s deploy error with Exception: %s",
-                this.taskFullName,
-                ExceptionUtils.getMessage(th)));
-            turnToEndState(ExecutionState.FAILED);
-            taskFuture.complete(
-                new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
+            failedByException(th);
         }
+    }
 
-        if (waitForCompleteByExecutionService == null) {
-            return;
-        }
+    private void failedByException(Throwable th) {
+        LOGGER.severe(String.format("%s deploy error with Exception: %s",
+            this.taskFullName,
+            ExceptionUtils.getMessage(th)));
+        turnToEndState(ExecutionState.FAILED);
+        taskFuture.complete(
+            new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th));
+    }
 
-        waitForCompleteByExecutionService.whenComplete((v, t) -> {
+    private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+        return new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+                nodeEngine.getSerializationService().toData(this.taskGroup),
+                this.pluginJarsUrls);
+    }
+
+    /**
+     * @param completeFuture This future only can completion by the task run in
+     *                       {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
+     */
+    private void monitorTask(PassiveCompletableFuture<TaskExecutionState> completeFuture) {
+        completeFuture.whenComplete((v, t) -> {
             try {
                 if (t != null) {
                     LOGGER.severe("An unexpected error occurred while the task was running", t);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index dc7974518..ea11a93d0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class SubPlan {
@@ -96,11 +97,11 @@ public class SubPlan {
     }
 
     public PassiveCompletableFuture<PipelineState> initStateFuture() {
-        physicalVertexList.stream().forEach(m -> {
+        physicalVertexList.forEach(m -> {
             addPhysicalVertexCallBack(m.initStateFuture());
         });
 
-        coordinatorVertexList.stream().forEach(m -> {
+        coordinatorVertexList.forEach(m -> {
             addPhysicalVertexCallBack(m.initStateFuture());
         });
 
@@ -136,11 +137,15 @@ public class SubPlan {
                     turnToEndState(PipelineState.FINISHED);
                     LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
                 }
-                pipelineFuture.complete(pipelineState.get());
+                this.pipelineFuture.complete(pipelineState.get());
             }
         });
     }
 
+    public void whenComplete(BiConsumer<? super PipelineState, ? super Throwable> action) {
+        this.pipelineFuture.whenComplete(action);
+    }
+
     private void turnToEndState(@NonNull PipelineState endState) {
         // consistency check
         if (pipelineState.get().isEndState()) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index d350019b7..8ec2796fe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -40,7 +40,7 @@ public interface Task extends Serializable {
     default void close() throws IOException {
     }
 
-    default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+    default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
     }
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 8e395cbe2..bab9db433 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.execution;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.service.slot.SlotContext;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
@@ -31,9 +32,12 @@ public class TaskExecutionContext {
     private final Task task;
     private final NodeEngineImpl nodeEngine;
 
-    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+    private final SlotContext slotContext;
+
+    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine, SlotContext slotContext) {
         this.task = task;
         this.nodeEngine = nodeEngine;
+        this.slotContext = slotContext;
     }
 
     public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -53,4 +57,8 @@ public class TaskExecutionContext {
     public <T> T getTask() {
         return (T) task;
     }
+
+    public SlotContext getSlotContext() {
+        return slotContext;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index fa517e500..536786dec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,7 +29,6 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
-import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
 import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 
@@ -69,14 +68,14 @@ public class JobMaster implements Runnable {
 
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
-                     @NonNull ExecutorService executorService) {
+                     @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
         flakeIdGenerator =
-            this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+                this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
 
-        this.resourceManager = new SimpleResourceManager(this.nodeEngine);
+        this.resourceManager = resourceManager;
     }
 
     public void init() throws Exception {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
new file mode 100644
index 000000000..a77068747
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final ConcurrentMap<String, WorkerProfile> registerWorker;
+
+    private final NodeEngine nodeEngine;
+
+    private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+    public AbstractResourceManager(NodeEngine nodeEngine) {
+        this.registerWorker = new ConcurrentHashMap<>();
+        this.nodeEngine = nodeEngine;
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+        CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
+        applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
+            if (error != null) {
+                completableFuture.completeExceptionally(error);
+            } else {
+                completableFuture.complete(profile.get(0));
+            }
+        });
+        return completableFuture;
+    }
+
+    private void waitingWorkerRegister() {
+        if (ExecutionMode.LOCAL.equals(mode)) {
+            // Local mode, should wait worker(master node) register.
+            try {
+                while (registerWorker.isEmpty()) {
+                    LOGGER.info("waiting current worker register to resource manager...");
+                    Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void memberRemoved(MembershipServiceEvent event) {
+        String nodeID = event.getMember().getAddress().toString();
+        LOGGER.severe("Node heartbeat timeout, disconnected for resource manager. " +
+            "NodeID: " + nodeID);
+        registerWorker.remove(nodeID);
+    }
+
+    @Override
+    public CompletableFuture<List<SlotProfile>> applyResources(long jobId,
+                                                               List<ResourceProfile> resourceProfile) throws NoEnoughResourceException {
+        waitingWorkerRegister();
+        return new ResourceRequestHandler(jobId, resourceProfile, registerWorker, this).request();
+    }
+
+    protected boolean supportDynamicWorker() {
+        return false;
+    }
+
+    /**
+     * find new worker in third party resource manager, it returned after worker register successes.
+     *
+     * @param resourceProfiles the worker should have resource profile list
+     */
+    protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
+        throw new UnsupportedOperationException("Unsupported operation to find new worker in " + this.getClass().getName());
+    }
+
+    @Override
+    public void close() {
+    }
+
+    protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
+        InvocationBuilder invocationBuilder =
+            nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
+                operation, address);
+        return invocationBuilder.invoke();
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (SlotProfile profile : profiles) {
+            futures.add(releaseResource(jobId, profile));
+        }
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
+            if (e != null) {
+                completableFuture.completeExceptionally(e);
+            } else {
+                completableFuture.complete(null);
+            }
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) {
+        return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
+    }
+
+    @Override
+    public void heartbeat(WorkerProfile workerProfile) {
+        if (!registerWorker.containsKey(workerProfile.getWorkerID())) {
+            LOGGER.info("received new worker register: " + workerProfile.getAddress());
+            sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
+        } else {
+            LOGGER.fine("received worker heartbeat from: " + workerProfile.getAddress());
+        }
+        registerWorker.put(workerProfile.getWorkerID(), workerProfile);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
index cb459944c..a4f5a2bff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/NoEnoughResourceException.java
@@ -17,14 +17,12 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class NoEnoughResourceException extends RuntimeException {
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+    public NoEnoughResourceException() {
+    }
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public NoEnoughResourceException(String message) {
+        super(message);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index cb459944c..c8fa75c84 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,14 +17,36 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+    void init();
+
+    CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException;
+
+    CompletableFuture<List<SlotProfile>> applyResources(long jobId, List<ResourceProfile> resourceProfile) throws NoEnoughResourceException;
+
+    CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles);
+
+    CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile);
+
+    /**
+     * Every time ResourceManager and Worker communicate, heartbeat method should be called to
+     * record the latest Worker status
+     *
+     * @param workerProfile the worker current worker's profile
+     */
+    void heartbeat(WorkerProfile workerProfile);
+
+    void memberRemoved(MembershipServiceEvent event);
+
+    void close();
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 000000000..82e968f53
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes.KubernetesResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn.YarnResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+public class ResourceManagerFactory {
+
+
+    private final NodeEngine nodeEngine;
+
+    public ResourceManagerFactory(NodeEngine nodeEngine) {
+        this.nodeEngine = nodeEngine;
+    }
+
+    public ResourceManager getResourceManager(DeployType type) {
+        if (DeployType.STANDALONE.equals(type)) {
+            return new StandaloneResourceManager(nodeEngine);
+        } else if (DeployType.KUBERNETES.equals(type)) {
+            return new KubernetesResourceManager(nodeEngine);
+        } else if (DeployType.YARN.equals(type)) {
+            return new YarnResourceManager(nodeEngine);
+        } else {
+            throw new UnsupportedDeployTypeException(type);
+        }
+    }
+
+    public ResourceManager getResourceManager() {
+        return this.getResourceManager(DeployType.STANDALONE);
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
new file mode 100644
index 000000000..a8fb6aeb1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Handle each slot request from resource manager
+ */
+public class ResourceRequestHandler {
+
+    private static final ILogger LOGGER = Logger.getLogger(ResourceRequestHandler.class);
+    private final CompletableFuture<List<SlotProfile>> completableFuture;
+    /*
+     * Cache the slot already request successes, and not request success or not request finished will be null.
+     * The key match with {@link resourceProfile} index. Meaning which value in resultSlotProfiles index is null, the
+     * resourceProfile with same index in resourceProfile haven't requested successes yet.
+     */
+    private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
+    private final ConcurrentMap<String, WorkerProfile> registerWorker;
+
+    private final long jobId;
+
+    private final List<ResourceProfile> resourceProfile;
+
+    private final AbstractResourceManager resourceManager;
+
+    public ResourceRequestHandler(long jobId,
+                                  List<ResourceProfile> resourceProfile,
+                                  ConcurrentMap<String, WorkerProfile> registerWorker,
+                                  AbstractResourceManager resourceManager) {
+        this.completableFuture = new CompletableFuture<>();
+        this.resultSlotProfiles = new ConcurrentHashMap<>();
+        this.jobId = jobId;
+        this.resourceProfile = resourceProfile;
+        this.registerWorker = registerWorker;
+        this.resourceManager = resourceManager;
+    }
+
+    public CompletableFuture<List<SlotProfile>> request() {
+        List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new ArrayList<>();
+        for (int i = 0; i < resourceProfile.size(); i++) {
+            ResourceProfile r = resourceProfile.get(i);
+            Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
+            if (workerProfile.isPresent()) {
+                // request slot to member
+                CompletableFuture<SlotAndWorkerProfile> internalCompletableFuture = singleResourceRequestToMember(i, r, workerProfile.get());
+                allRequestFuture.add(internalCompletableFuture);
+            }
+        }
+        // all resource preCheck done, also had sent request to worker
+        getAllOfFuture(allRequestFuture).whenComplete((unused, error) -> {
+            if (error != null) {
+                completeRequestWithException(error);
+            }
+            if (resultSlotProfiles.size() < resourceProfile.size()) {
+                // meaning have some slot not request success
+                if (resourceManager.supportDynamicWorker()) {
+                    applyByDynamicWorker();
+                } else {
+                    completeRequestWithException(new NoEnoughResourceException("can't apply resource request: " + resourceProfile.get(findNullIndexInResultSlotProfiles())));
+                }
+            }
+        });
+        return completableFuture;
+    }
+
+    private int findNullIndexInResultSlotProfiles() {
+        for (int i = 0; i < resourceProfile.size(); i++) {
+            if (!resultSlotProfiles.containsKey(i)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private void completeRequestWithException(Throwable e) {
+        releaseAllResourceInternal();
+        completableFuture.completeExceptionally(e);
+    }
+
+    private void addSlotToCacheMap(int index, SlotProfile slotProfile) {
+        if (null != slotProfile) {
+            resultSlotProfiles.put(index, slotProfile);
+            if (resultSlotProfiles.size() == resourceProfile.size()) {
+                List<SlotProfile> value = new ArrayList<>();
+                for (int i = 0; i < resultSlotProfiles.size(); i++) {
+                    value.add(resultSlotProfiles.get(i));
+                }
+                completableFuture.complete(value);
+            }
+        }
+    }
+
+    private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(int i, ResourceProfile r, WorkerProfile workerProfile) {
+        InvocationFuture<SlotAndWorkerProfile> future = resourceManager.sendToMember(new RequestSlotOperation(jobId, r), workerProfile.getAddress());
+        return future.whenComplete(
+            (slotAndWorkerProfile, error) -> {
+                if (error != null) {
+                    throw new RuntimeException(error);
+                } else {
+                    resourceManager.heartbeat(slotAndWorkerProfile.getWorkerProfile());
+                    addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
+                }
+            }
+        );
+    }
+
+    private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+        // Check if there are still unassigned slots
+        Optional<WorkerProfile> workerProfile =
+            registerWorker.values().stream().filter(worker -> Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot -> slot.getResourceProfile().enoughThan(r))).findAny();
+
+        if (!workerProfile.isPresent()) {
+            // Check if there are still unassigned resources
+            workerProfile =
+                registerWorker.values().stream().filter(worker -> worker.getUnassignedResource().enoughThan(r)).findAny();
+        }
+
+        return workerProfile;
+    }
+
+    /**
+     * When the {@link DeployType} supports dynamic workers and the resources of the current worker
+     * cannot meet the requirements of resource application, we can dynamically request the third-party
+     * resource management to create a new worker, and then complete the resource application
+     */
+    private void applyByDynamicWorker() {
+        List<ResourceProfile> needApplyResource = new ArrayList<>();
+        List<Integer> needApplyIndex = new ArrayList<>();
+        for (int i = 0; i < resultSlotProfiles.size(); i++) {
+            if (!resultSlotProfiles.containsKey(i)) {
+                needApplyResource.add(resourceProfile.get(i));
+                needApplyIndex.add(i);
+            }
+        }
+        resourceManager.findNewWorker(needApplyResource);
+        resourceManager.applyResources(jobId, needApplyResource).whenComplete((s, e) -> {
+            if (e != null) {
+                completeRequestWithException(e);
+                return;
+            }
+            for (int i = 0; i < s.size(); i++) {
+                addSlotToCacheMap(needApplyIndex.get(i), s.get(i));
+            }
+        });
+    }
+
+    private void releaseAllResourceInternal() {
+        LOGGER.warning("apply resource not success, release all already applied resource");
+        resultSlotProfiles.values().stream().filter(Objects::nonNull).forEach(profile -> {
+            resourceManager.releaseResource(jobId, profile);
+        });
+    }
+
+    private <T> CompletableFuture<T> getAllOfFuture(List<CompletableFuture<T>> allRequestFuture) {
+        return (CompletableFuture<T>) CompletableFuture.allOf(allRequestFuture.toArray(new CompletableFuture[0]));
+    }
+
+}
+
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
deleted file mode 100644
index 28bbb81d1..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.resourcemanager;
-
-import org.apache.seatunnel.engine.common.exception.JobException;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-
-import com.hazelcast.cluster.Address;
-import com.hazelcast.spi.impl.NodeEngine;
-import lombok.Data;
-import lombok.NonNull;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Data
-public class SimpleResourceManager implements ResourceManager {
-
-    // TODO We may need more detailed resource define, instead of the resource definition method of only Address.
-    private Map<Long, Map<TaskGroupLocation, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
-
-    private final NodeEngine nodeEngine;
-
-    public SimpleResourceManager(NodeEngine nodeEngine) {
-        this.nodeEngine = nodeEngine;
-    }
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    @Override
-    public Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation) {
-        Map<TaskGroupLocation, Address> jobAddressMap =
-            physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
-
-        Address localhost =
-            jobAddressMap.putIfAbsent(taskGroupLocation, nodeEngine.getThisAddress());
-        if (null == localhost) {
-            localhost = jobAddressMap.get(taskGroupLocation);
-        }
-
-        return localhost;
-
-    }
-
-    @Override
-    @NonNull
-    public Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation) {
-        Map<TaskGroupLocation, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
-        if (null == longAddressMap || longAddressMap.isEmpty()) {
-            throw new JobException(
-                String.format("Job %s, Task %s can not found applied resource.", jobId, taskGroupLocation));
-        }
-
-        return longAddressMap.get(taskGroupLocation);
-    }
-}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
index cb459944c..704c9ff68 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/StandaloneResourceManager.java
@@ -17,14 +17,11 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import com.hazelcast.spi.impl.NodeEngine;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class StandaloneResourceManager extends AbstractResourceManager {
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public StandaloneResourceManager(NodeEngine nodeEngine) {
+        super(nodeEngine);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
index cb459944c..a1f091a29 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/UnsupportedDeployTypeException.java
@@ -17,14 +17,11 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.common.runtime.DeployType;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class UnsupportedDeployTypeException extends RuntimeException {
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public UnsupportedDeployTypeException(DeployType type) {
+        super("Unknown deploy type: " + (type != null ? type.name() : "null"));
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
index 558593551..c969425a6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +28,49 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class ReleaseSlotOperation extends Operation implements IdentifiedDataSerializable {
 
-    private TaskLocation currentTaskID;
-    private TaskLocation committerTaskID;
+    private long jobID;
+    private SlotProfile slotProfile;
 
-    public SinkUnregisterOperation() {
+    public ReleaseSlotOperation() {
     }
 
-    public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
-        this.currentTaskID = currentTaskID;
-        this.committerTaskID = committerTaskID;
+    public ReleaseSlotOperation(long jobID, SlotProfile slotProfile) {
+        this.jobID = jobID;
+        this.slotProfile = slotProfile;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
-        task.receivedWriterUnregister(currentTaskID);
+        server.getSlotService().releaseSlot(jobID, slotProfile);
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        currentTaskID.writeData(out);
-        committerTaskID.writeData(out);
+        out.writeObject(slotProfile);
+        out.writeLong(jobID);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        currentTaskID.readData(in);
-        committerTaskID.readData(in);
+        slotProfile = in.readObject();
+        jobID = in.readLong();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
     public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
+        return ResourceDataSerializerHook.FACTORY_ID;
     }
 
     @Override
     public int getClassId() {
-        return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+        return ResourceDataSerializerHook.RELEASE_SLOT_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
index 558593551..56a7971be 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/RequestSlotOperation.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +29,55 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class RequestSlotOperation extends Operation implements IdentifiedDataSerializable {
 
-    private TaskLocation currentTaskID;
-    private TaskLocation committerTaskID;
+    private ResourceProfile resourceProfile;
+    private long jobID;
+    private SlotAndWorkerProfile result;
 
-    public SinkUnregisterOperation() {
+    public RequestSlotOperation() {
     }
 
-    public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
-        this.currentTaskID = currentTaskID;
-        this.committerTaskID = committerTaskID;
+    public RequestSlotOperation(long jobID, ResourceProfile resourceProfile) {
+        this.resourceProfile = resourceProfile;
+        this.jobID = jobID;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
-        task.receivedWriterUnregister(currentTaskID);
+        result = server.getSlotService().requestSlot(jobID, resourceProfile);
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        currentTaskID.writeData(out);
-        committerTaskID.writeData(out);
+        out.writeObject(resourceProfile);
+        out.writeLong(jobID);
+    }
+
+    @Override
+    public Object getResponse() {
+        return result;
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        currentTaskID.readData(in);
-        committerTaskID.readData(in);
+        resourceProfile = in.readObject();
+        jobID = in.readLong();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
     public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
+        return ResourceDataSerializerHook.FACTORY_ID;
     }
 
     @Override
     public int getClassId() {
-        return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+        return ResourceDataSerializerHook.REQUEST_SLOT_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java
new file mode 100644
index 000000000..d7321419b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ResetResourceOperation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class ResetResourceOperation extends Operation implements IdentifiedDataSerializable {
+    public ResetResourceOperation() {
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        server.getSlotService().reset();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.RESET_RESOURCE_TYPE;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
index 558593551..5ea970c47 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/WorkerHeartbeatOperation.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
@@ -29,46 +28,45 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class SinkUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class WorkerHeartbeatOperation extends Operation implements IdentifiedDataSerializable {
 
-    private TaskLocation currentTaskID;
-    private TaskLocation committerTaskID;
+    private WorkerProfile workerProfile;
 
-    public SinkUnregisterOperation() {
+    public WorkerHeartbeatOperation() {
     }
 
-    public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation committerTaskID) {
-        this.currentTaskID = currentTaskID;
-        this.committerTaskID = committerTaskID;
+    public WorkerHeartbeatOperation(WorkerProfile workerProfile) {
+        this.workerProfile = workerProfile;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
-        task.receivedWriterUnregister(currentTaskID);
+        server.getResourceManager().heartbeat(workerProfile);
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        currentTaskID.writeData(out);
-        committerTaskID.writeData(out);
+        out.writeObject(workerProfile);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        currentTaskID.readData(in);
-        committerTaskID.readData(in);
+        workerProfile = in.readObject();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
     public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
+        return ResourceDataSerializerHook.FACTORY_ID;
     }
 
     @Override
     public int getClassId() {
-        return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
+        return ResourceDataSerializerHook.WORKER_HEARTBEAT_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
index cb459944c..6fda31285 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/CPU.java
@@ -15,16 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class CPU implements Resource {
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+    private final int core;
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+    private CPU(int core) {
+        this.core = core;
+    }
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public int getCore() {
+        return core;
+    }
+
+    public static CPU of(int core) {
+        return new CPU(core);
+    }
+
+    @Override
+    public String toString() {
+        return "CPU{" +
+            "core=" + core +
+            '}';
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
similarity index 62%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
index cb459944c..f94254995 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Memory.java
@@ -15,16 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class Memory implements Resource {
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+    private final long bytes;
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+    private Memory(long bytes) {
+        this.bytes = bytes;
+    }
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public long getBytes() {
+        return bytes;
+    }
+
+    public static Memory of(long bytes) {
+        return new Memory(bytes);
+    }
+
+    @Override
+    public String toString() {
+        return "Memory{" +
+            "bytes=" + bytes +
+            '}';
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
index cb459944c..c200e64b7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/Resource.java
@@ -15,16 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import java.io.Serializable;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
-
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+/**
+ * The mark of seatunnel worker resource
+ */
+public interface Resource extends Serializable {
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java
new file mode 100644
index 000000000..e687718e3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/ResourceProfile.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+    private final CPU cpu;
+
+    private final Memory heapMemory;
+
+    public ResourceProfile() {
+        this.cpu = CPU.of(0);
+        this.heapMemory = Memory.of(0);
+    }
+
+    public ResourceProfile(CPU cpu, Memory heapMemory) {
+        checkArgument(cpu.getCore() >= 0, "The cpu core cannot be negative");
+        checkArgument(heapMemory.getBytes() >= 0, "The heapMemory bytes cannot be negative");
+        this.cpu = cpu;
+        this.heapMemory = heapMemory;
+    }
+
+    public CPU getCpu() {
+        return cpu;
+    }
+
+    public Memory getHeapMemory() {
+        return heapMemory;
+    }
+
+    public ResourceProfile merge(ResourceProfile other) {
+        CPU c = CPU.of(this.cpu.getCore() + other.getCpu().getCore());
+        Memory m = Memory.of(this.heapMemory.getBytes() + other.heapMemory.getBytes());
+        return new ResourceProfile(c, m);
+    }
+
+    public ResourceProfile subtract(ResourceProfile other) {
+        CPU c = CPU.of(this.cpu.getCore() - other.getCpu().getCore());
+        Memory m = Memory.of(this.heapMemory.getBytes() - other.heapMemory.getBytes());
+        return new ResourceProfile(c, m);
+    }
+
+    public boolean enoughThan(ResourceProfile other) {
+        return this.cpu.getCore() >= other.getCpu().getCore()
+                && this.heapMemory.getBytes() >= other.getHeapMemory().getBytes();
+    }
+
+    @Override
+    public String toString() {
+        return "ResourceProfile{" +
+                "cpu=" + cpu +
+                ", heapMemory=" + heapMemory +
+                '}';
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
new file mode 100644
index 000000000..d79fe7151
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import com.hazelcast.cluster.Address;
+
+import java.io.Serializable;
+
+/**
+ * Used to describe the status of the current slot, including resource size and assign status
+ */
+public class SlotProfile implements Serializable {
+
+    private final Address worker;
+
+    private final int slotID;
+
+    private long ownerJobID;
+
+    private volatile boolean assigned;
+
+    private final ResourceProfile resourceProfile;
+
+    public SlotProfile(Address worker, int slotID, ResourceProfile resourceProfile) {
+        this.worker = worker;
+        this.slotID = slotID;
+        this.resourceProfile = resourceProfile;
+    }
+
+    public Address getWorker() {
+        return worker;
+    }
+
+    public int getSlotID() {
+        return slotID;
+    }
+
+    public ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    public long getOwnerJobID() {
+        return ownerJobID;
+    }
+
+    public void assign(long jobID) {
+        if (assigned) {
+            throw new UnsupportedOperationException();
+        } else {
+            ownerJobID = jobID;
+            assigned = true;
+        }
+    }
+
+    public void unassigned() {
+        assigned = false;
+    }
+
+    @Override
+    public String toString() {
+        return "SlotProfile{" +
+                "worker=" + worker +
+                ", slotID=" + slotID +
+                ", ownerJobID=" + ownerJobID +
+                ", assigned=" + assigned +
+                ", resourceProfile=" + resourceProfile +
+                '}';
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
index cb459944c..9181b9b9f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/CreateWorkerResult.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public class CreateWorkerResult {
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+    private String message;
+
+    private WorkerProfile workerProfile;
+
+    private Throwable error;
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
index cb459944c..914562725 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+import java.util.concurrent.CompletableFuture;
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+public interface ThirdPartyResourceManager {
+
+    CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile);
+
+    CompletableFuture<Void> releaseWorker(String workerID);
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
new file mode 100644
index 000000000..bb9e6610d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes;
+
+import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.util.concurrent.CompletableFuture;
+
+public class KubernetesResourceManager extends AbstractResourceManager implements ThirdPartyResourceManager {
+
+    public KubernetesResourceManager(NodeEngine nodeEngine) {
+        super(nodeEngine);
+    }
+
+    @Override
+    public CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseWorker(String workerID) {
+        return null;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
new file mode 100644
index 000000000..1c9abdf04
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn;
+
+import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
+import org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyResourceManager;
+
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.util.concurrent.CompletableFuture;
+
+public class YarnResourceManager extends AbstractResourceManager implements ThirdPartyResourceManager {
+    public YarnResourceManager(NodeEngine nodeEngine) {
+        super(nodeEngine);
+    }
+
+    @Override
+    public CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile resourceProfile) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseWorker(String workerID) {
+        return null;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
new file mode 100644
index 000000000..329fd9dd1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.worker;
+
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * Used to describe the status of the current Worker, including address and resource assign status
+ */
+@Data
+public class WorkerProfile implements Serializable {
+
+    private final String workerID;
+
+    private final Address address;
+
+    private ResourceProfile profile;
+
+    private ResourceProfile unassignedResource;
+
+    private SlotProfile[] assignedSlots;
+
+    private SlotProfile[] unassignedSlots;
+
+    public WorkerProfile(String workerID, Address address) {
+        this.workerID = workerID;
+        this.address = address;
+        this.unassignedResource = new ResourceProfile();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index bb99792ff..6c561a709 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -26,19 +26,30 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
+import com.google.common.collect.Lists;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class PipelineBaseScheduler implements JobScheduler {
     private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
     private final PhysicalPlan physicalPlan;
+
+    private final long jobId;
     private final JobMaster jobMaster;
     private final ResourceManager resourceManager;
 
@@ -46,6 +57,7 @@ public class PipelineBaseScheduler implements JobScheduler {
         this.physicalPlan = physicalPlan;
         this.jobMaster = jobMaster;
         this.resourceManager = jobMaster.getResourceManager();
+        this.jobId = physicalPlan.getJobImmutableInformation().getJobId();
     }
 
     @Override
@@ -56,18 +68,23 @@ public class PipelineBaseScheduler implements JobScheduler {
                     handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
                     return null;
                 }
-                if (!applyResourceForPipeline(pipeline)) {
-                    return null;
+                Map<PhysicalVertex, SlotProfile> slotProfiles;
+                try {
+                    slotProfiles = applyResourceForPipeline(pipeline);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
+                pipeline.whenComplete((state, error) -> releasePipelineResource(Lists.newArrayList(slotProfiles.values())));
                 // deploy pipeline
                 return CompletableFuture.supplyAsync(() -> {
-                    deployPipeline(pipeline);
+                    // TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
+                    deployPipeline(pipeline, slotProfiles);
                     return null;
                 });
-            }).filter(x -> x != null).collect(Collectors.toList());
+            }).filter(Objects::nonNull).collect(Collectors.toList());
             try {
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
-                    collect.toArray(new CompletableFuture[collect.size()]));
+                    collect.toArray(new CompletableFuture[0]));
                 voidCompletableFuture.get();
             } catch (Exception e) {
                 // cancel pipeline and throw an exception
@@ -80,64 +97,73 @@ public class PipelineBaseScheduler implements JobScheduler {
         }
     }
 
-    private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
-        try {
-            // apply resource for coordinators
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
-
-            // apply resource for other tasks
-            subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
-        } catch (JobNoEnoughResourceException e) {
-            LOGGER.severe(e);
-            return false;
+    private void releasePipelineResource(List<SlotProfile> slotProfiles) {
+        if (null == slotProfiles || slotProfiles.isEmpty()) {
+            return;
         }
-
-        return true;
+        resourceManager.releaseResources(jobId, slotProfiles).join();
     }
 
-    private void applyResourceForTask(PhysicalVertex task) {
+    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
         try {
-            // TODO If there is no enough resources for tasks, we need add some wait profile
-            if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
-                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getTaskGroupLocation());
-            } else {
-                handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+            Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+            Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+            });
+
+            subPlan.getPhysicalVertexList().forEach(task -> {
+                // TODO If there is no enough resources for tasks, we need add some wait profile
+                if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+                    // TODO custom resource size
+                    futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
+                } else {
+                    handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+                }
+            });
+            for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+                try {
+                    slotProfiles.put(future.getKey(), future.getValue().get());
+                } catch (NoEnoughResourceException e) {
+                    // TODO custom exception with pipelineID, jobName etc.
+                    throw new JobNoEnoughResourceException("No enough resource to execute pipeline", e);
+                }
             }
-        } catch (JobNoEnoughResourceException e) {
+            return slotProfiles;
+        } catch (JobNoEnoughResourceException | ExecutionException | InterruptedException e) {
             LOGGER.severe(e);
+            throw e;
         }
     }
 
-    private CompletableFuture<Void> deployTask(PhysicalVertex task) {
+    private CompletableFuture<Void> deployTask(PhysicalVertex task, Supplier<Void> deployMethod) {
         if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
             // deploy is a time-consuming operation, so we do it async
-            return CompletableFuture.supplyAsync(() -> {
-                task.deploy(
-                    resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                        task.getTaskGroup().getTaskGroupLocation()));
-                return null;
-            });
+            return CompletableFuture.supplyAsync(deployMethod);
         } else {
             handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
         }
         return null;
     }
 
-    private void deployPipeline(@NonNull SubPlan pipeline) {
+    private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
         if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
-            List<CompletableFuture> deployCoordinatorFuture =
-                pipeline.getCoordinatorVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
-                    .collect(Collectors.toList());
+            List<CompletableFuture<?>> deployCoordinatorFuture =
+                pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, () -> {
+                    coordinator.deployOnMaster();
+                    return null;
+                })).filter(Objects::nonNull).collect(Collectors.toList());
 
-            List<CompletableFuture> deployTaskFuture =
-                pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
-                    .collect(Collectors.toList());
+            List<CompletableFuture<?>> deployTaskFuture =
+                pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, () -> {
+                    task.deploy(slotProfiles.get(task));
+                    return null;
+                })).filter(Objects::nonNull).collect(Collectors.toList());
 
             try {
                 deployCoordinatorFuture.addAll(deployTaskFuture);
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
-                    deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]));
+                    deployCoordinatorFuture.toArray(new CompletableFuture[0]));
                 voidCompletableFuture.get();
                 if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
                     LOGGER.info(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
new file mode 100644
index 000000000..dfa2d753b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class ResourceDataSerializerHook implements DataSerializerHook {
+
+
+    public static final int WORKER_HEARTBEAT_TYPE = 1;
+
+    public static final int REQUEST_SLOT_TYPE = 2;
+
+    public static final int RELEASE_SLOT_TYPE = 3;
+
+    public static final int RESET_RESOURCE_TYPE = 4;
+
+    public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+            SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
+            SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
+    );
+
+    @Override
+    public int getFactoryId() {
+        return FACTORY_ID;
+    }
+
+    @Override
+    public DataSerializableFactory createFactory() {
+        return new Factory();
+    }
+
+    private static class Factory implements DataSerializableFactory {
+
+        @Override
+        public IdentifiedDataSerializable create(int typeId) {
+            switch (typeId) {
+                case WORKER_HEARTBEAT_TYPE:
+                    return new WorkerHeartbeatOperation();
+                case REQUEST_SLOT_TYPE:
+                    return new RequestSlotOperation();
+                case RELEASE_SLOT_TYPE:
+                    return new ReleaseSlotOperation();
+                case RESET_RESOURCE_TYPE:
+                    return new ResetResourceOperation();
+                default:
+                    throw new IllegalArgumentException("Unknown type id " + typeId);
+            }
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
new file mode 100644
index 000000000..5c60d25cf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.service.slot;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The slot service of seatunnel server, used for manage slot in worker.
+ */
+public class DefaultSlotService implements SlotService {
+
+    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
+    private static final int HEARTBEAT_RETRY_TIME = 5;
+    private final NodeEngineImpl nodeEngine;
+
+    private AtomicReference<ResourceProfile> unassignedResource;
+
+    private AtomicReference<ResourceProfile> assignedResource;
+
+    private ConcurrentMap<Integer, SlotProfile> assignedSlots;
+
+    private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
+    private ScheduledExecutorService scheduledExecutorService;
+    private final String serviceID;
+    private final boolean dynamicSlot;
+    private final int slotNumber;
+    private volatile boolean initStatus;
+    private final IdGenerator idGenerator;
+    private final TaskExecutionService taskExecutionService;
+    private ConcurrentMap<Integer, SlotContext> contexts;
+
+    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+        this.nodeEngine = nodeEngine;
+        this.dynamicSlot = dynamicSlot;
+        this.taskExecutionService = taskExecutionService;
+        this.slotNumber = slotNumber;
+        this.serviceID = nodeEngine.getThisAddress().toString();
+        this.idGenerator = new IdGenerator();
+    }
+
+    @Override
+    public void init() {
+        initStatus = true;
+        contexts = new ConcurrentHashMap<>();
+        assignedSlots = new ConcurrentHashMap<>();
+        unassignedSlots = new ConcurrentHashMap<>();
+        unassignedResource = new AtomicReference<>(new ResourceProfile());
+        assignedResource = new AtomicReference<>(new ResourceProfile());
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
+            String.format("hz.%s.seaTunnel.slotService.thread", nodeEngine.getHazelcastInstance().getName())));
+        if (!dynamicSlot) {
+            initFixedSlots();
+        }
+        unassignedResource.set(getNodeResource());
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                RetryUtils.retryWithException(() -> {
+                    LOGGER.fine("start send heartbeat to resource manager, this address: " + nodeEngine.getClusterService().getThisAddress());
+                    sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
+                    return null;
+                }, new RetryUtils.RetryMaterial(HEARTBEAT_RETRY_TIME, true, e -> true, DEFAULT_HEARTBEAT_TIMEOUT));
+            } catch (Exception e) {
+                LOGGER.severe(e);
+                LOGGER.severe("failed send heartbeat to resource manager, will retry later. this address: " + nodeEngine.getClusterService().getThisAddress());
+            }
+        }, 0, DEFAULT_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void reset() {
+        if (!initStatus) {
+            synchronized (this) {
+                if (!initStatus) {
+                    this.close();
+                    init();
+                }
+            }
+        }
+    }
+
+    @Override
+    public synchronized SlotAndWorkerProfile requestSlot(long jobId, ResourceProfile resourceProfile) {
+        initStatus = false;
+        LOGGER.info(String.format("received slot request, jobID: %d, resource profile: %s", jobId, resourceProfile));
+        SlotProfile profile = selectBestMatchSlot(resourceProfile);
+        if (profile != null) {
+            profile.assign(jobId);
+            assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
+            unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
+            unassignedSlots.remove(profile.getSlotID());
+            assignedSlots.put(profile.getSlotID(), profile);
+            contexts.computeIfAbsent(profile.getSlotID(), p -> new SlotContext(profile.getSlotID(), taskExecutionService));
+        }
+        return new SlotAndWorkerProfile(toWorkerProfile(), profile);
+    }
+
+    public SlotContext getSlotContext(int slotID) {
+        return contexts.get(slotID);
+    }
+
+    @Override
+    public void releaseSlot(long jobId, SlotProfile profile) {
+        LOGGER.info(String.format("received slot release request, jobID: %d, slot: %s", jobId, profile));
+        if (!assignedSlots.containsKey(profile.getSlotID())) {
+            throw new WrongTargetSlotException("Not exist this slot in slot service, slot profile: " + profile);
+        }
+
+        if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
+            throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d",
+                    assignedSlots.get(profile.getSlotID()), jobId));
+        }
+
+        assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
+        unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
+        profile.unassigned();
+        if (!dynamicSlot) {
+            unassignedSlots.put(profile.getSlotID(), profile);
+        }
+        assignedSlots.remove(profile.getSlotID());
+        contexts.remove(profile.getSlotID());
+    }
+
+    @Override
+    public void close() {
+        scheduledExecutorService.shutdown();
+    }
+
+    private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
+        if (unassignedSlots.isEmpty() && !dynamicSlot) {
+            return null;
+        }
+        if (dynamicSlot) {
+            if (unassignedResource.get().enoughThan(profile)) {
+                return new SlotProfile(nodeEngine.getThisAddress(), (int) idGenerator.getNextId(), profile);
+            }
+        } else {
+            Optional<SlotProfile> result = unassignedSlots.values().stream()
+                    .filter(slot -> slot.getResourceProfile().enoughThan(profile))
+                    .min((slot1, slot2) -> {
+                        if (slot1.getResourceProfile().getHeapMemory().getBytes() != slot2.getResourceProfile().getHeapMemory().getBytes()) {
+                            return slot1.getResourceProfile().getHeapMemory().getBytes() - slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
+                        } else {
+                            return slot1.getResourceProfile().getCpu().getCore() - slot2.getResourceProfile().getCpu().getCore();
+                        }
+                    });
+            return result.orElse(null);
+        }
+        return null;
+    }
+
+    private void initFixedSlots() {
+        long maxMemory = Runtime.getRuntime().maxMemory();
+        for (int i = 0; i < slotNumber; i++) {
+            unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
+                    new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+        }
+    }
+
+    public WorkerProfile toWorkerProfile() {
+        WorkerProfile workerProfile = new WorkerProfile(serviceID, nodeEngine.getThisAddress());
+        workerProfile.setProfile(getNodeResource());
+        workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
+        workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
+        workerProfile.setUnassignedResource(unassignedResource.get());
+        return workerProfile;
+    }
+
+    private ResourceProfile getNodeResource() {
+        return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
+    }
+
+    public <E> InvocationFuture<E> sendToMaster(Operation operation) {
+        InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+        return invocationBuilder.invoke();
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index d350019b7..2523e79d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -15,32 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.service.slot;
 
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
-import java.io.IOException;
 import java.io.Serializable;
 
-public interface Task extends Serializable {
+public class SlotAndWorkerProfile implements Serializable {
 
-    default void init() throws Exception {
-    }
-
-    @NonNull
-    ProgressState call() throws Exception;
+    private final WorkerProfile workerProfile;
 
-    @NonNull
-    Long getTaskID();
+    private final SlotProfile slotProfile;
 
-    default boolean isThreadsShare() {
-        return false;
+    public SlotAndWorkerProfile(WorkerProfile workerProfile, SlotProfile slotProfile) {
+        this.workerProfile = workerProfile;
+        this.slotProfile = slotProfile;
     }
 
-    default void close() throws IOException {
+    public WorkerProfile getWorkerProfile() {
+        return workerProfile;
     }
 
-    default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+    /**
+     * Get slot profile of worker return. Could be null if no slot can be provided.
+     */
+    public SlotProfile getSlotProfile() {
+        return slotProfile;
     }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
index d350019b7..d975de191 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotContext.java
@@ -15,32 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.service.slot;
 
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
 
-import java.io.IOException;
-import java.io.Serializable;
+public class SlotContext {
+    private final TaskExecutionService taskExecutionService;
+    private final int slotID;
 
-public interface Task extends Serializable {
-
-    default void init() throws Exception {
-    }
-
-    @NonNull
-    ProgressState call() throws Exception;
-
-    @NonNull
-    Long getTaskID();
-
-    default boolean isThreadsShare() {
-        return false;
+    public SlotContext(int slotID, TaskExecutionService taskExecutionService) {
+        this.slotID = slotID;
+        this.taskExecutionService = taskExecutionService;
+        this.taskExecutionService.setSlotContext(this);
     }
 
-    default void close() throws IOException {
+    public int getSlotID() {
+        return slotID;
     }
 
-    default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
+    public TaskExecutionService getTaskExecutionService() {
+        return taskExecutionService;
     }
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index cb459944c..5d5a5ab14 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.service.slot;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+public interface SlotService {
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
+    void init();
+
+    void reset();
+
+    SlotAndWorkerProfile requestSlot(long jobID, ResourceProfile resourceProfile);
+
+    SlotContext getSlotContext(int slotID);
+
+    void releaseSlot(long jobId, SlotProfile slotProfile);
+
+    void close();
 
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
similarity index 66%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
index cb459944c..16a3dae43 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/WrongTargetSlotException.java
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.resourcemanager;
+package org.apache.seatunnel.engine.server.service.slot;
 
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+public class WrongTargetSlotException extends RuntimeException {
 
-import com.hazelcast.cluster.Address;
-import lombok.NonNull;
+    public WrongTargetSlotException() {
+    }
 
-public interface ResourceManager {
-    Address applyForResource(long jobId, TaskGroupLocation taskGroupLocation);
-
-    @NonNull
-    Address getAppliedResource(long jobId, TaskGroupLocation taskGroupLocation);
+    public WrongTargetSlotException(String message) {
+        super(message);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 4f7c64afc..4797ea914 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -57,7 +57,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private Map<TaskLocation, Address> taskMemberMapping;
     private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
 
-    private EnumeratorState currState;
+    private volatile EnumeratorState currState;
 
     private CompletableFuture<Void> readerRegisterFuture;
     private CompletableFuture<Void> readerFinishFuture;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 1667caf4d..155307a6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.operation.AsyncOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
@@ -32,18 +33,21 @@ import java.io.IOException;
 
 public class DeployTaskOperation extends AsyncOperation {
     private Data taskImmutableInformation;
+    private SlotProfile slotProfile;
 
     public DeployTaskOperation() {
     }
 
-    public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
+    public DeployTaskOperation(@NonNull SlotProfile slotProfile, @NonNull Data taskImmutableInformation) {
         this.taskImmutableInformation = taskImmutableInformation;
+        this.slotProfile = slotProfile;
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer server = getService();
-        return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+        return server.getSlotService().getSlotContext(slotProfile.getSlotID())
+                .getTaskExecutionService().deployTask(taskImmutableInformation);
     }
 
     @Override
@@ -55,11 +59,13 @@ public class DeployTaskOperation extends AsyncOperation {
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
         IOUtil.writeData(out, taskImmutableInformation);
+        out.writeObject(slotProfile);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
         taskImmutableInformation = IOUtil.readData(in);
+        slotProfile = in.readObject();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index ccbc8d519..ccc2d0bce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -36,7 +36,7 @@ import java.io.IOException;
 public class SinkRegisterOperation extends Operation implements IdentifiedDataSerializable {
 
     private static final ILogger LOGGER = Logger.getLogger(SinkRegisterOperation.class);
-    private static final int RETRY_TIME = 5;
+    private static final int RETRY_NUMBER = 5;
     private static final int RETRY_INTERVAL = 2000;
     private TaskLocation writerTaskID;
     private TaskLocation committerTaskID;
@@ -54,10 +54,9 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
         SeaTunnelServer server = getService();
         Address readerAddress = getCallerAddress();
         SinkAggregatedCommitterTask<?> task = null;
-        for (int i = 0; i < RETRY_TIME; i++) {
+        for (int i = 0; i < RETRY_NUMBER; i++) {
             try {
-                task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation())
-                        .getTaskGroup().getTask(committerTaskID.getTaskID());
+                task = server.getTaskExecutionService().getTask(committerTaskID);
                 break;
             } catch (NullPointerException e) {
                 LOGGER.warning("can't get committer task , waiting task started");
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
index 558593551..84d523b80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
@@ -46,7 +46,7 @@ public class SinkUnregisterOperation extends Operation implements IdentifiedData
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation()).getTaskGroup().getTask(committerTaskID.getTaskID());
+                server.getTaskExecutionService().getTask(committerTaskID);
         task.receivedWriterUnregister(currentTaskID);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index ddbe9ae59..a2ccdc4ba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -50,9 +50,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
-            SourceSeaTunnelTask<?, SplitT> task =
-                server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getTaskGroup()
-                    .getTask(taskID.getTaskID());
+            SourceSeaTunnelTask<?, SplitT> task = server.getTaskExecutionService().getTask(taskID);
             task.receivedSourceSplit(splits);
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index f25b895b3..7533f75e6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -46,9 +46,7 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
-            SourceSeaTunnelTask<?, ?> task =
-                server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupLocation())
-                    .getTaskGroup().getTask(readerLocation.getTaskID());
+            SourceSeaTunnelTask<?, ?> task = server.getTaskExecutionService().getTask(readerLocation);
             task.close();
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index d214368a1..ba853833f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -50,9 +50,7 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
         SeaTunnelServer server = getService();
 
         RetryUtils.retryWithException(() -> {
-            SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
-                    .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+            SourceSplitEnumeratorTask<?> task = server.getTaskExecutionService().getTask(enumeratorTaskID);
             task.requestSplit(taskID.getTaskID());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 58b09146e..0cba1c1f7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,8 +49,7 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
-                    .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+                server.getTaskExecutionService().getTask(enumeratorTaskID);
             task.readerFinished(currentTaskID.getTaskID());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index a2f08bf70..f3c88c25b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -57,8 +57,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
         Address readerAddress = getCallerAddress();
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupLocation()).getTaskGroup()
-                    .getTask(enumeratorTaskID.getTaskID());
+                server.getTaskExecutionService().getTask(enumeratorTaskID);
             task.receivedReader(readerTaskID, readerAddress);
             return null;
         }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index bf045cd23..ced85edfa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -16,4 +16,5 @@
 #
 
 org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
-org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
\ No newline at end of file
+org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
+org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 000000000..89274d2a6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
+
+    private ResourceManager resourceManager;
+
+    private final long jobId = 5;
+
+    @Before
+    public void before() {
+        super.before();
+        resourceManager = server.getResourceManager();
+        server.getSlotService();
+    }
+
+    @Test
+    public void testApplyRequest() throws ExecutionException, InterruptedException {
+        List<ResourceProfile> resourceProfiles = new ArrayList<>();
+        resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
+        resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
+        resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
+        List<SlotProfile> slotProfiles = resourceManager.applyResources(jobId, resourceProfiles).get();
+
+        Assert.assertEquals(resourceProfiles.get(0).getHeapMemory().getBytes(), slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
+        Assert.assertEquals(resourceProfiles.get(1).getHeapMemory().getBytes(), slotProfiles.get(1).getResourceProfile().getHeapMemory().getBytes());
+        Assert.assertEquals(resourceProfiles.get(2).getHeapMemory().getBytes(), slotProfiles.get(2).getResourceProfile().getHeapMemory().getBytes());
+
+        Assert.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId + 1, slotProfiles).get());
+
+        resourceManager.releaseResources(jobId, slotProfiles).get();
+
+        Assert.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId, slotProfiles).get());
+
+        Assert.assertThrows(ExecutionException.class, () -> resourceManager.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
+    }
+
+}