You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/31 04:09:53 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2472: [Engine][ResourceManager] Add ResourceManager

hailin0 commented on code in PR #2472:
URL: https://github.com/apache/incubator-seatunnel/pull/2472#discussion_r959065964


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -75,8 +81,20 @@ public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelCon
         logger.info("SeaTunnel server start...");
     }
 
-    public TaskExecutionService getTaskExecutionService() {
-        return this.taskExecutionService;
+    /**
+     * Lazy load for Slot Service
+     */
+    public SlotService getSlotService() {
+        if (slotService == null) {
+            synchronized (this) {
+                if (slotService == null) {
+                    SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, true, 2);

Review Comment:
   slotNumber get from configuration?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatListener;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatManager;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final Map<String, WorkerProfile> registerWorker;
+
+    private final HeartbeatManager heartbeatManager;
+    private final NodeEngine nodeEngine;
+
+    private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+    public AbstractResourceManager(NodeEngine nodeEngine) {
+        this.registerWorker = new ConcurrentHashMap<>();
+        this.nodeEngine = nodeEngine;
+        this.heartbeatManager = new HeartbeatManager(new WorkerHeartbeatListener());
+    }
+
+    @Override
+    public void init() {
+        heartbeatManager.start(Executors.newSingleThreadScheduledExecutor());

Review Comment:
   Use `ThreadFeactory` define thread name?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import com.hazelcast.cluster.Address;
+
+import java.io.Serializable;
+
+public class SlotProfile implements Serializable {
+
+    private final Address worker;
+
+    private final int slotID;
+
+    private long ownerJobID;
+
+    private volatile boolean assigned;
+
+    private final ResourceProfile resourceProfile;
+
+    public SlotProfile(Address worker, int slotID, ResourceProfile resourceProfile) {
+        this.worker = worker;
+        this.slotID = slotID;
+        this.resourceProfile = resourceProfile;
+    }
+
+    public Address getWorker() {
+        return worker;
+    }
+
+    public int getSlotID() {
+        return slotID;
+    }
+
+    public ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    public long getOwnerJobID() {
+        return ownerJobID;
+    }
+
+    public void assign(long jobID) {
+        if (assigned) {
+            throw new UnsupportedOperationException();
+        } else {
+            ownerJobID = jobID;
+            assigned = true;
+        }
+    }
+
+    public void unassigned() {
+        assigned = false;
+    }

Review Comment:
   Is there concurrent access here?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.service.slot;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The slot service of seatunnel server, used for manage slot in worker.
+ */
+public class DefaultSlotService implements SlotService {
+
+    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
+    private static final int HEARTBEAT_RETRY_TIME = 5;
+    private final NodeEngineImpl nodeEngine;
+
+    private AtomicReference<ResourceProfile> unassignedResource;
+
+    private AtomicReference<ResourceProfile> assignedResource;
+
+    private Map<Integer, SlotProfile> assignedSlots;
+
+    private Map<Integer, SlotProfile> unassignedSlots;
+    private ScheduledExecutorService scheduledExecutorService;
+    private final String serviceID;
+    private final boolean dynamicSlot;
+    private final int slotNumber;
+    private final IdGenerator idGenerator;
+    private final TaskExecutionService taskExecutionService;
+    private Map<Integer, SlotContext> contexts;
+
+    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+        this.nodeEngine = nodeEngine;
+        this.dynamicSlot = dynamicSlot;
+        this.taskExecutionService = taskExecutionService;
+        this.slotNumber = slotNumber;
+        this.serviceID = nodeEngine.getThisAddress().toString();
+        this.idGenerator = new IdGenerator();
+    }
+
+    @Override
+    public void init() {
+        contexts = new ConcurrentHashMap<>();
+        assignedSlots = new ConcurrentHashMap<>();
+        unassignedSlots = new ConcurrentHashMap<>();
+        unassignedResource = new AtomicReference<>(new ResourceProfile());
+        assignedResource = new AtomicReference<>(new ResourceProfile());
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Use ThreadFeactor define thread name?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatListener;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatManager;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final Map<String, WorkerProfile> registerWorker;
+
+    private final HeartbeatManager heartbeatManager;
+    private final NodeEngine nodeEngine;
+
+    private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+    public AbstractResourceManager(NodeEngine nodeEngine) {
+        this.registerWorker = new ConcurrentHashMap<>();
+        this.nodeEngine = nodeEngine;
+        this.heartbeatManager = new HeartbeatManager(new WorkerHeartbeatListener());
+    }
+
+    @Override
+    public void init() {
+        heartbeatManager.start(Executors.newSingleThreadScheduledExecutor());
+    }
+
+    @Override
+    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+        waitingWorkerRegister();
+        CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
+        applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
+            if (error != null) {
+                completableFuture.completeExceptionally(error);
+            } else {
+                completableFuture.complete(profile.get(0));
+            }
+        });
+        return completableFuture;
+    }
+
+    private void waitingWorkerRegister() {
+        if (ExecutionMode.LOCAL.equals(mode)) {
+            // Local mode, should wait worker(master node) register.
+            try {
+                while (registerWorker.isEmpty()) {
+                    Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);

Review Comment:
   Let the user know that the program is blocking



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java:
##########
@@ -80,64 +96,73 @@ public void startScheduling() {
         }
     }
 
-    private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
-        try {
-            // apply resource for coordinators
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
-
-            // apply resource for other tasks
-            subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
-        } catch (JobNoEnoughResourceException e) {
-            LOGGER.severe(e);
-            return false;
+    private void releasePipelineResource(List<SlotProfile> slotProfiles) {
+        if (null == slotProfiles || slotProfiles.isEmpty()) {
+            return;
         }
-
-        return true;
+        resourceManager.releaseResources(jobId, slotProfiles).join();
     }
 
-    private void applyResourceForTask(PhysicalVertex task) {
+    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
         try {
-            // TODO If there is no enough resources for tasks, we need add some wait profile
-            if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
-                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getTaskGroupLocation());
-            } else {
-                handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+            Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+            Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+            });
+
+            subPlan.getPhysicalVertexList().forEach(task -> {
+                // TODO If there is no enough resources for tasks, we need add some wait profile
+                if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+                    // TODO custom resource size
+                    futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
+                } else {
+                    handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+                }
+            });
+            for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+                try {
+                    slotProfiles.put(future.getKey(), future.getValue().get());
+                } catch (NoEnoughResourceException e) {
+                    // TODO custom exception with pipelineID, jobName etc.
+                    throw new JobNoEnoughResourceException("No enough resource to execute pipeline");

Review Comment:
   Use `throw new JobNoEnoughResourceException(..., e)`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java:
##########
@@ -80,64 +96,73 @@ public void startScheduling() {
         }
     }
 
-    private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
-        try {
-            // apply resource for coordinators
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
-
-            // apply resource for other tasks
-            subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
-        } catch (JobNoEnoughResourceException e) {
-            LOGGER.severe(e);
-            return false;
+    private void releasePipelineResource(List<SlotProfile> slotProfiles) {
+        if (null == slotProfiles || slotProfiles.isEmpty()) {
+            return;
         }
-
-        return true;
+        resourceManager.releaseResources(jobId, slotProfiles).join();
     }
 
-    private void applyResourceForTask(PhysicalVertex task) {
+    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
         try {
-            // TODO If there is no enough resources for tasks, we need add some wait profile
-            if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
-                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getTaskGroupLocation());
-            } else {
-                handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+            Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+            Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+            });
+
+            subPlan.getPhysicalVertexList().forEach(task -> {
+                // TODO If there is no enough resources for tasks, we need add some wait profile
+                if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+                    // TODO custom resource size
+                    futures.put(task, resourceManager.applyResource(jobId, new ResourceProfile()));
+                } else {
+                    handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+                }
+            });
+            for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+                try {
+                    slotProfiles.put(future.getKey(), future.getValue().get());
+                } catch (NoEnoughResourceException e) {
+                    // TODO custom exception with pipelineID, jobName etc.
+                    throw new JobNoEnoughResourceException("No enough resource to execute pipeline");
+                }
             }
-        } catch (JobNoEnoughResourceException e) {
+            return slotProfiles;
+        } catch (JobNoEnoughResourceException | ExecutionException | InterruptedException e) {
             LOGGER.severe(e);
+            throw e;
         }
     }
 
-    private CompletableFuture<Void> deployTask(PhysicalVertex task) {
+    private CompletableFuture<Void> deployTask(PhysicalVertex task, Supplier<Void> supplier) {

Review Comment:
   rename `supplier` to use understandable names?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/heartbeat/HeartbeatManager.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.heartbeat;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HeartbeatManager {
+
+    private static final ILogger LOGGER = Logger.getLogger(HeartbeatManager.class);
+
+    private static final int DEFAULT_TIMEOUT_MILLISECONDS = 5000;
+    private static final int DEFAULT_INTERVAL_MILLISECONDS = 3000;
+    private final Map<String, Long> lastHeartbeat;

Review Comment:
   Use `ConcurrentMap`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatListener;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatManager;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final Map<String, WorkerProfile> registerWorker;

Review Comment:
   Use `ConcurrentMap` ?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatListener;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatManager;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final Map<String, WorkerProfile> registerWorker;
+
+    private final HeartbeatManager heartbeatManager;
+    private final NodeEngine nodeEngine;
+
+    private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+    public AbstractResourceManager(NodeEngine nodeEngine) {
+        this.registerWorker = new ConcurrentHashMap<>();
+        this.nodeEngine = nodeEngine;
+        this.heartbeatManager = new HeartbeatManager(new WorkerHeartbeatListener());
+    }
+
+    @Override
+    public void init() {
+        heartbeatManager.start(Executors.newSingleThreadScheduledExecutor());
+    }
+
+    @Override
+    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+        waitingWorkerRegister();
+        CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
+        applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
+            if (error != null) {
+                completableFuture.completeExceptionally(error);
+            } else {
+                completableFuture.complete(profile.get(0));
+            }
+        });
+        return completableFuture;
+    }
+
+    private void waitingWorkerRegister() {
+        if (ExecutionMode.LOCAL.equals(mode)) {
+            // Local mode, should wait worker(master node) register.
+            try {
+                while (registerWorker.isEmpty()) {
+                    Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);

Review Comment:
   Add log print message



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -172,7 +186,8 @@ public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
             final Map<Boolean, List<Task>> byCooperation =
                 tasks.stream()
                     .peek(x -> {
-                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine);
+                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine,
+                                slotContext);
                         x.setTaskExecutionContext(taskExecutionContext);

Review Comment:
   rename `x` to `task`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/heartbeat/HeartbeatManager.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.heartbeat;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HeartbeatManager {
+
+    private static final ILogger LOGGER = Logger.getLogger(HeartbeatManager.class);
+
+    private static final int DEFAULT_TIMEOUT_MILLISECONDS = 5000;
+    private static final int DEFAULT_INTERVAL_MILLISECONDS = 3000;
+    private final Map<String, Long> lastHeartbeat;
+    private final HeartbeatListener listener;
+
+    public void heartbeat(String nodeID) {
+        lastHeartbeat.put(nodeID, System.currentTimeMillis());
+    }
+
+    public void removeNode(String nodeID) {
+        lastHeartbeat.remove(nodeID);
+    }
+
+    public HeartbeatManager(HeartbeatListener listener) {
+        this.listener = listener;
+        this.lastHeartbeat = new ConcurrentHashMap<>();
+    }
+
+    public void start(ScheduledExecutorService scheduledExecutorService) {
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            lastHeartbeat.forEach((nodeID, last) -> {
+                    long now = System.currentTimeMillis();
+                    // TODO support custom timeout
+                    if (now - last > DEFAULT_TIMEOUT_MILLISECONDS) {
+                        LOGGER.severe("Node heartbeat timeout, disconnected for heartbeatManager. " +
+                                "NodeID: " + nodeID);
+                        listener.nodeDisconnected(nodeID);
+                    }
+                }

Review Comment:
   Log exception message or set UncaughtExceptionHandler ?
   
   default UncaughtExceptionHandler print message to console



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.service.slot;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The slot service of seatunnel server, used for manage slot in worker.
+ */
+public class DefaultSlotService implements SlotService {
+
+    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
+    private static final int HEARTBEAT_RETRY_TIME = 5;
+    private final NodeEngineImpl nodeEngine;
+
+    private AtomicReference<ResourceProfile> unassignedResource;
+
+    private AtomicReference<ResourceProfile> assignedResource;
+
+    private Map<Integer, SlotProfile> assignedSlots;
+
+    private Map<Integer, SlotProfile> unassignedSlots;
+    private ScheduledExecutorService scheduledExecutorService;
+    private final String serviceID;
+    private final boolean dynamicSlot;
+    private final int slotNumber;
+    private final IdGenerator idGenerator;
+    private final TaskExecutionService taskExecutionService;
+    private Map<Integer, SlotContext> contexts;
+
+    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+        this.nodeEngine = nodeEngine;
+        this.dynamicSlot = dynamicSlot;
+        this.taskExecutionService = taskExecutionService;
+        this.slotNumber = slotNumber;
+        this.serviceID = nodeEngine.getThisAddress().toString();
+        this.idGenerator = new IdGenerator();
+    }
+
+    @Override
+    public void init() {
+        contexts = new ConcurrentHashMap<>();
+        assignedSlots = new ConcurrentHashMap<>();
+        unassignedSlots = new ConcurrentHashMap<>();
+        unassignedResource = new AtomicReference<>(new ResourceProfile());
+        assignedResource = new AtomicReference<>(new ResourceProfile());
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        if (!dynamicSlot) {
+            initFixedSlots();
+        }
+        unassignedResource.set(getNodeResource());
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                RetryUtils.retryWithException(() -> {
+                    sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
+                    return null;
+                }, new RetryUtils.RetryMaterial(HEARTBEAT_RETRY_TIME, true, e -> true, DEFAULT_HEARTBEAT_TIMEOUT));
+            } catch (Exception e) {
+                throw new RuntimeException(e);

Review Comment:
   Log exception message or set `UncaughtExceptionHandler `?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.service.slot;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The slot service of seatunnel server, used for manage slot in worker.
+ */
+public class DefaultSlotService implements SlotService {
+
+    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
+    private static final int HEARTBEAT_RETRY_TIME = 5;
+    private final NodeEngineImpl nodeEngine;
+
+    private AtomicReference<ResourceProfile> unassignedResource;
+
+    private AtomicReference<ResourceProfile> assignedResource;
+
+    private Map<Integer, SlotProfile> assignedSlots;
+
+    private Map<Integer, SlotProfile> unassignedSlots;
+    private ScheduledExecutorService scheduledExecutorService;
+    private final String serviceID;
+    private final boolean dynamicSlot;
+    private final int slotNumber;
+    private final IdGenerator idGenerator;
+    private final TaskExecutionService taskExecutionService;
+    private Map<Integer, SlotContext> contexts;

Review Comment:
   Use `ConcurrentMap`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatListener;
+import org.apache.seatunnel.engine.server.resourcemanager.heartbeat.HeartbeatManager;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
+    private static final ILogger LOGGER = Logger.getLogger(AbstractResourceManager.class);
+
+    protected final Map<String, WorkerProfile> registerWorker;
+
+    private final HeartbeatManager heartbeatManager;
+    private final NodeEngine nodeEngine;
+
+    private final ExecutionMode mode = ExecutionMode.LOCAL;
+
+    public AbstractResourceManager(NodeEngine nodeEngine) {
+        this.registerWorker = new ConcurrentHashMap<>();
+        this.nodeEngine = nodeEngine;
+        this.heartbeatManager = new HeartbeatManager(new WorkerHeartbeatListener());
+    }
+
+    @Override
+    public void init() {
+        heartbeatManager.start(Executors.newSingleThreadScheduledExecutor());

Review Comment:
   Missing logic to close Executor ?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java:
##########
@@ -56,8 +56,7 @@ public void run() throws Exception {
         SinkAggregatedCommitterTask<?> task = null;
         for (int i = 0; i < RETRY_TIME; i++) {
             try {
-                task = server.getTaskExecutionService().getExecutionContext(committerTaskID.getTaskGroupLocation())
-                        .getTaskGroup().getTask(committerTaskID.getTaskID());
+                task = server.getTaskExecutionService().getTask(committerTaskID);
                 break;
             } catch (NullPointerException e) {
                 LOGGER.warning("can't get committer task , waiting task started");

Review Comment:
   rename `RETRY_TIME` to `RETRY_NUMBER`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org