You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/10 09:53:57 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 37d9e0dfb [ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)
37d9e0dfb is described below

commit 37d9e0dfbbcf17835fc49c6a4ae765a9a6cad4e8
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Aug 10 17:53:51 2022 +0800

    [ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)
    
    * Add dynamic thread sharing optimization (#2279)
---
 pom.xml                                            |   7 +
 seatunnel-engine/seatunnel-engine-common/pom.xml   |   2 +-
 .../engine/common/utils/NonCompletableFuture.java  |   2 +-
 seatunnel-engine/seatunnel-engine-server/pom.xml   |   4 +
 .../seatunnel/engine/server/SeaTunnelServer.java   |   3 +-
 .../engine/server/TaskExecutionService.java        | 290 ++++++++++++++++-----
 .../seatunnel/engine/server/execution/Task.java    |  19 +-
 .../engine/server/execution/TaskCallTimer.java     | 146 +++++++++++
 .../server/execution/TaskExecutionContext.java     |  38 ++-
 .../engine/server/execution/TaskGroup.java         |   3 +
 .../execution/{TaskGroup.java => TaskTracker.java} |  22 +-
 .../seatunnel/engine/server/task/AbstractTask.java |   8 +-
 .../task/SeaTunnelSplitEnumeratorContext.java      |   2 +-
 .../engine/server/task/SeaTunnelTask.java          |   4 +-
 .../server/task/SourceSplitEnumeratorTask.java     |  15 +-
 .../server/task/operation/RegisterOperation.java   |   4 +-
 .../test/execution/TaskExecutionServiceTest.java   |  91 -------
 .../engine/server/TaskExecutionServiceTest.java    | 290 +++++++++++++++++++++
 .../server/execution/ExceptionTestTask.java}       |  33 ++-
 .../server/execution/FixedCallTestTimeTask.java}   |  55 ++--
 .../engine/server/execution/StopTimeTestTask.java} |  50 ++--
 .../engine/server}/execution/TestTask.java         |  22 +-
 22 files changed, 828 insertions(+), 282 deletions(-)

diff --git a/pom.xml b/pom.xml
index ab1634dec..0db72fd5c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,7 @@
         <guava.version>19.0</guava.version>
         <auto-service.version>1.0.1</auto-service.version>
         <powermock.version>2.0.9</powermock.version>
+        <awaitility.version>4.1.1</awaitility.version>
         <hadoop2.version>2.6.5</hadoop2.version>
         <hadoop3.version>3.0.0</hadoop3.version>
         <seatunnel.shade.package>org.apache.seatunnel.shade</seatunnel.shade.package>
@@ -689,6 +690,12 @@
                 <version>${powermock.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.awaitility</groupId>
+                <artifactId>awaitility</artifactId>
+                <version>${awaitility.version}</version>
+                <scope>test</scope>
+            </dependency>
             <dependency>
                 <groupId>org.powermock</groupId>
                 <artifactId>powermock-api-mockito2</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index afad57006..118479711 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -45,7 +45,7 @@
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api</artifactId>
-            <version>${project.version}</version>
+            <version>${version}</version>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
index a3c6d2380..5bedc6900 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -62,7 +62,7 @@ public class NonCompletableFuture<T> extends CompletableFuture<T> {
         throw new UnsupportedOperationException("This future can't be completed by an outside caller");
     }
 
-    private void internalComplete(T value) {
+    public void internalComplete(T value) {
         super.complete(value);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
index b193e438a..2df759f0e 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -47,6 +47,10 @@
             <artifactId>scala-library</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 24a36c355..4159346e4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -70,6 +70,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         taskExecutionService = new TaskExecutionService(
             nodeEngine, nodeEngine.getProperties()
         );
+        taskExecutionService.start();
     }
 
     @Override
@@ -79,7 +80,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     @Override
     public void shutdown(boolean terminate) {
-
+        taskExecutionService.shutdown();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index fb5eed305..6e25cc2d1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -18,29 +18,42 @@
 package org.apache.seatunnel.engine.server;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.util.Collections.emptyList;
 import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toList;
 
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskTracker;
 
-import com.hazelcast.jet.impl.util.NonCompletableFuture;
 import com.hazelcast.logging.ILogger;
-import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.properties.HazelcastProperties;
 import lombok.NonNull;
+import lombok.SneakyThrows;
 
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This class is responsible for the execution of the Task
@@ -48,12 +61,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TaskExecutionService {
 
     private final String hzInstanceName;
-    private final NodeEngine nodeEngine;
+    private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
     private volatile boolean isShutdown;
-    private final ExecutorService blockingTaskletExecutor = newCachedThreadPool(new BlockingTaskThreadFactory());
+    private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new LinkedBlockingDeque<>();
+    private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
+    private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
-    private final ConcurrentMap<Long, TaskExecutionContext> executionContexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, ConcurrentMap<Long, TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -61,91 +76,91 @@ public class TaskExecutionService {
         this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
     }
 
+    public void start() {
+        runBusWorkSupplier.runNewBusWork(false);
+    }
+
     public void shutdown() {
         isShutdown = true;
-        blockingTaskletExecutor.shutdownNow();
+        executorService.shutdownNow();
     }
 
-    public TaskExecutionContext getExecutionContext(long taskId) {
-        return executionContexts.get(taskId);
+    public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long taskGroupId) {
+        return executionContexts.get(taskGroupId);
     }
 
-    /**
-     * Submit a TaskGroup and run the Task in it
-     */
-    public Map<Long, TaskExecutionContext> submitTask(
-        TaskGroup taskGroup
-    ) {
-        Map<Long, TaskExecutionContext> contextMap = new HashMap<>(taskGroup.getTasks().size());
-        taskGroup.getTasks().forEach(task -> {
-            contextMap.put(task.getTaskID(), submitTask(task));
-        });
-        return contextMap;
+    private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
+        tasks.stream()
+            .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
+            .forEach(threadShareTaskQueue::add);
     }
 
-    public TaskExecutionContext submitTask(Task task) {
-        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
-        TaskletTracker taskletTracker = new TaskletTracker(task, cancellationFuture);
-        taskletTracker.taskletFutures =
-            blockingTaskletExecutor.submit(new BlockingWorker(taskletTracker));
+    private void submitBlockingTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
 
-        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(
-            taskletTracker.future,
-            cancellationFuture,
-            this
-        );
-
-        executionContexts.put(task.getTaskID(), taskExecutionContext);
-        return taskExecutionContext;
+        CountDownLatch startedLatch = new CountDownLatch(tasks.size());
+        taskGroupExecutionTracker.blockingFutures = tasks
+            .stream()
+            .map(t -> new BlockingWorker(new TaskTracker(t, taskGroupExecutionTracker), startedLatch))
+            .map(executorService::submit)
+            .collect(toList());
 
+        // Do not return from this method until all workers have started. Otherwise
+        // on cancellation there is a race where the executor might not have started
+        // the worker yet. This would result in taskletDone() never being called for
+        // a worker.
+        uncheckRun(startedLatch::await);
     }
 
-    private final class TaskletTracker {
-        final NonCompletableFuture future = new NonCompletableFuture();
-        final Task task;
-        volatile Future<?> taskletFutures;
-
-        TaskletTracker(Task task, CompletableFuture<Void> cancellationFuture) {
-            this.task = task;
-
-            cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
-                if (e == null) {
-                    e = new IllegalStateException("cancellationFuture should be completed exceptionally");
-                }
-                future.internalCompleteExceptionally(e);
-                taskletFutures.cancel(true);
-            }));
-        }
-
-        @Override
-        public String toString() {
-            return "Tracking " + task;
+    public CompletableFuture<TaskExecutionState> submitTaskGroup(
+        TaskGroup taskGroup,
+        CompletableFuture<Void> cancellationFuture
+    ) {
+        Collection<Task> tasks = taskGroup.getTasks();
+        final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+        try {
+            ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
+            final Map<Boolean, List<Task>> byCooperation =
+                tasks.stream()
+                    .peek(x -> {
+                        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine);
+                        x.setTaskExecutionContext(taskExecutionContext);
+                        taskExecutionContextMap.put(x.getTaskID(), taskExecutionContext);
+                    })
+                    .collect(partitioningBy(Task::isThreadsShare));
+            submitThreadShareTask(executionTracker, byCooperation.get(true));
+            submitBlockingTask(executionTracker, byCooperation.get(false));
+            executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+        } catch (Throwable t) {
+            executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
         }
+        return new NonCompletableFuture<>(executionTracker.future);
     }
 
     private final class BlockingWorker implements Runnable {
 
-        private final TaskletTracker tracker;
+        private final TaskTracker tracker;
+        private final CountDownLatch startedLatch;
 
-        private BlockingWorker(TaskletTracker tracker) {
+        private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
             this.tracker = tracker;
+            this.startedLatch = startedLatch;
         }
 
         @Override
         public void run() {
             final Task t = tracker.task;
             try {
+                startedLatch.countDown();
                 t.init();
                 ProgressState result;
                 do {
                     result = t.call();
-                } while (!result.isDone() && !isShutdown && !tracker.taskletFutures.isCancelled());
-
+                } while (!result.isDone() && !isShutdown && !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
             } catch (Throwable e) {
                 logger.warning("Exception in " + t, e);
-                tracker.future.internalCompleteExceptionally(e);
+                tracker.taskGroupExecutionTracker.exception(e);
             } finally {
-                tracker.future.internalComplete();
+                tracker.taskGroupExecutionTracker.taskDone();
             }
         }
     }
@@ -156,7 +171,162 @@ public class TaskExecutionService {
         @Override
         public Thread newThread(@NonNull Runnable r) {
             return new Thread(r,
-                String.format("hz.%s.seaTunnel.blocking.thread-%d", hzInstanceName, seq.getAndIncrement()));
+                String.format("hz.%s.seaTunnel.task.thread-%d", hzInstanceName, seq.getAndIncrement()));
+        }
+    }
+
+    /**
+     * CooperativeTaskWorker is used to poll the task call method,
+     * When a task times out, a new BusWork will be created to take over the execution of the task
+     */
+    public final class CooperativeTaskWorker implements Runnable {
+
+        AtomicBoolean keep = new AtomicBoolean(true);
+        public AtomicReference<TaskTracker> exclusiveTaskTracker = new AtomicReference<>();
+        final TaskCallTimer timer;
+        public LinkedBlockingDeque<TaskTracker> taskqueue;
+
+        @SuppressWarnings("checkstyle:MagicNumber")
+        public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue, RunBusWorkSupplier runBusWorkSupplier) {
+            logger.info(String.format("Created new BusWork : %s", this.hashCode()));
+            this.taskqueue = taskqueue;
+            this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
+        }
+
+        @SneakyThrows
+        @Override
+        public void run() {
+            while (keep.get()) {
+                TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
+                    exclusiveTaskTracker.get() :
+                    taskqueue.takeFirst();
+                TaskGroupExecutionTracker taskGroupExecutionTracker = taskTracker.taskGroupExecutionTracker;
+                if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
+                    taskGroupExecutionTracker.taskDone();
+                    if (null != exclusiveTaskTracker.get()) {
+                        // If it's exclusive need to end the work
+                        break;
+                    } else {
+                        // No action required and don't put back
+                        continue;
+                    }
+                }
+                //start timer, if it's exclusive, don't need to start
+                if (null == exclusiveTaskTracker.get()) {
+                    timer.timerStart(taskTracker);
+                }
+                ProgressState call = null;
+                try {
+                    //run task
+                    call = taskTracker.task.call();
+                    synchronized (timer) {
+                        timer.timerStop();
+                    }
+                } catch (Throwable e) {
+                    //task Failure and complete
+                    taskGroupExecutionTracker.exception(e);
+                    taskGroupExecutionTracker.taskDone();
+                    //If it's exclusive need to end the work
+                    logger.warning("Exception in " + taskTracker.task, e);
+                    if (null != exclusiveTaskTracker.get()) {
+                        break;
+                    }
+                } finally {
+                    //stop timer
+                    timer.timerStop();
+                }
+                //task call finished
+                if (null != call) {
+                    if (call.isDone()) {
+                        //If it's exclusive, you need to end the work
+                        taskGroupExecutionTracker.taskDone();
+                        if (null != exclusiveTaskTracker.get()) {
+                            break;
+                        }
+                    } else {
+                        //Task is not completed. Put task to the end of the queue
+                        //If the current work has an exclusive tracker, it will not be put back
+                        if (null == exclusiveTaskTracker.get()) {
+                            taskqueue.offer(taskTracker);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Used to create a new BusWork and run
+     */
+    public final class RunBusWorkSupplier {
+
+        ExecutorService executorService;
+        LinkedBlockingDeque<TaskTracker> taskQueue;
+
+        public RunBusWorkSupplier(ExecutorService executorService, LinkedBlockingDeque<TaskTracker> taskqueue) {
+            this.executorService = executorService;
+            this.taskQueue = taskqueue;
+        }
+
+        public boolean runNewBusWork(boolean checkTaskQueue) {
+            if (!checkTaskQueue || taskQueue.size() > 0) {
+                executorService.submit(new CooperativeTaskWorker(taskQueue, this));
+                return true;
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Internal utility class to track the overall state of tasklet execution.
+     * There's one instance of this class per job.
+     */
+    public final class TaskGroupExecutionTracker {
+
+        private final TaskGroup taskGroup;
+        final CompletableFuture<TaskExecutionState> future = new CompletableFuture<>();
+        volatile List<Future<?>> blockingFutures = emptyList();
+
+        private final AtomicInteger completionLatch;
+        private final AtomicReference<Throwable> executionException = new AtomicReference<>();
+
+        private final AtomicBoolean isCancel = new AtomicBoolean(false);
+
+        TaskGroupExecutionTracker(CompletableFuture<Void> cancellationFuture, TaskGroup taskGroup) {
+            this.completionLatch = new AtomicInteger(taskGroup.getTasks().size());
+            this.taskGroup = taskGroup;
+            cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
+                isCancel.set(true);
+                if (e == null) {
+                    e = new IllegalStateException("cancellationFuture should be completed exceptionally");
+                }
+                exception(e);
+                // Don't interrupt the threads. We require that they do not block for too long,
+                // interrupting them might make the termination faster, but can also cause troubles.
+                blockingFutures.forEach(f -> f.cancel(false));
+            }));
+        }
+
+        void exception(Throwable t) {
+            executionException.compareAndSet(null, t);
+        }
+
+        void taskDone() {
+            if (completionLatch.decrementAndGet() == 0) {
+                executionContexts.remove(taskGroup.getId());
+                Throwable ex = executionException.get();
+                if (ex == null) {
+                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
+                } else if (isCancel.get()) {
+                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, ex));
+                } else {
+                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
+                }
+            }
+        }
+
+        boolean executionCompletedExceptionally() {
+            return executionException.get() != null;
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 9025facce..a35d63223 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,14 +17,10 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import com.hazelcast.spi.impl.operationservice.Operation;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.UUID;
 
 public interface Task extends Serializable {
 
@@ -37,24 +33,13 @@ public interface Task extends Serializable {
     @NonNull
     Long getTaskID();
 
-    default boolean isCooperative() {
+    default boolean isThreadsShare() {
         return false;
     }
 
     default void close() throws IOException {
     }
 
-    default void setOperationService(OperationService operationService) {
+    default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
     }
-
-    default <E> InvocationFuture<E> sendToMaster(Operation operation) {
-        // TODO add method send operation to master
-        return null;
-    }
-
-    default <E> InvocationFuture<E> sendToMember(Operation operation, UUID memberID) {
-        // TODO add method send operation to member
-        return null;
-    }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java
new file mode 100644
index 000000000..d4514babf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TaskCallTimer is a time-consuming timer for Task Call method execution
+ */
+public class TaskCallTimer extends Thread {
+
+    long nextExecutionTime;
+    long delay;
+
+    TaskExecutionService.CooperativeTaskWorker cooperativeTaskWorker;
+    AtomicBoolean keep;
+    TaskExecutionService.RunBusWorkSupplier runBusWorkSupplier;
+
+    TaskTracker taskTracker;
+
+    private final Object lock = new Object();
+    boolean started = false;
+    AtomicBoolean wait0 = new AtomicBoolean(false);
+
+    public TaskCallTimer(
+        long delay,
+        AtomicBoolean keep,
+        TaskExecutionService.RunBusWorkSupplier runBusWorkSupplier,
+        TaskExecutionService.CooperativeTaskWorker cooperativeTaskWorker) {
+        this.delay = delay;
+        this.keep = keep;
+        this.runBusWorkSupplier = runBusWorkSupplier;
+        this.cooperativeTaskWorker = cooperativeTaskWorker;
+    }
+
+    private void startTimer() {
+        nextExecutionTime = System.currentTimeMillis() + delay;
+        this.start();
+    }
+
+    public void reSet(long tmpDelay) {
+        nextExecutionTime = System.currentTimeMillis() + tmpDelay;
+        if (started) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        } else {
+            started = true;
+            this.start();
+        }
+    }
+
+    public void reSet() {
+        nextExecutionTime = System.currentTimeMillis() + delay;
+        if (!started) {
+            started = true;
+            this.start();
+        }
+
+    }
+
+    public void timerStart(TaskTracker taskTracker) {
+        wait0.set(false);
+        this.taskTracker = taskTracker;
+        nextExecutionTime = System.currentTimeMillis() + delay;
+        if (started) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        } else {
+            started = true;
+            this.start();
+        }
+    }
+
+    public void timerStop() {
+        wait0.set(true);
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            long currentTime;
+            long executionTime;
+            boolean wait;
+            try {
+                synchronized (this) {
+                    wait = wait0.get();
+                    currentTime = System.currentTimeMillis();
+                    executionTime = this.nextExecutionTime;
+                    if (!wait && executionTime <= currentTime) {
+                        timeoutAct(this.taskTracker.expiredTimes.incrementAndGet());
+                        break;
+                    }
+                }
+                if (wait) {
+                    synchronized (lock) {
+                        lock.wait();
+                    }
+                } else {
+                    synchronized (lock) {
+                        lock.wait(executionTime - currentTime);
+                    }
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * The action to be performed when the task call method execution times out
+     */
+    private void timeoutAct(int expiredTimes) {
+        if (expiredTimes >= 1) {
+            // 1 busWork keep on running
+            keep.set(true);
+            // 2 busWork exclusive to the current taskTracker
+            cooperativeTaskWorker.exclusiveTaskTracker.set(taskTracker);
+            // 3 Submit a new BusWork to execute other tasks
+            runBusWorkSupplier.runNewBusWork(false);
+        } else {
+            // 1 Stop the current busWork from continuing to execute the new Task
+            keep.set(false);
+            // 2 Submit a new BusWork to execute other tasks
+            runBusWorkSupplier.runNewBusWork(false);
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 3cd8c6734..baccf0e34 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,33 +17,31 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 
-import java.util.concurrent.CompletableFuture;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
 public class TaskExecutionContext {
 
-    // future which is Task submit
-    public volatile CompletableFuture<Void> executionFuture;
+    private final Task task;
+    private final NodeEngineImpl nodeEngine;
 
-    // future which can only be used to cancel the local execution.
-    private volatile CompletableFuture<Void> cancellationFuture;
-
-    private TaskExecutionService taskExecutionService;
-
-    public TaskExecutionContext(
-        CompletableFuture<Void> executionFuture,
-        CompletableFuture<Void> cancellationFuture,
-        TaskExecutionService taskExecutionService
-    ) {
-        this.executionFuture = executionFuture;
-        this.cancellationFuture = cancellationFuture;
-        this.taskExecutionService = taskExecutionService;
+    public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+        this.task = task;
+        this.nodeEngine = nodeEngine;
     }
 
-    public CompletableFuture<Void> cancel() {
-        cancellationFuture.cancel(true);
-        return executionFuture;
+    public  <E> InvocationFuture<E> sendToMaster(Operation operation) {
+        InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+        return invocationBuilder.invoke();
     }
 
+    public <E> InvocationFuture<E> sendToMember(Operation operation, Address memberID) {
+        InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, memberID);
+        return invocationBuilder.invoke();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index c0123277c..764a1cdf8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -24,6 +24,9 @@ import java.util.Collection;
 
 @Data
 public class TaskGroup implements Serializable {
+    //TODO iD is required. The construction method needs to be modified later
+    private long id;
+
     private final String taskGroupName;
 
     private final Collection<Task> tasks;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
index c0123277c..2ab25d3b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
@@ -17,14 +17,22 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.Data;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
 
-import java.io.Serializable;
-import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
 
-@Data
-public class TaskGroup implements Serializable {
-    private final String taskGroupName;
+public class TaskTracker {
+    public final AtomicInteger expiredTimes = new AtomicInteger(0);
+    public final TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker;
+    public final Task task;
 
-    private final Collection<Task> tasks;
+    public TaskTracker(Task task, TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker) {
+        this.task = task;
+        this.taskGroupExecutionTracker = taskGroupExecutionTracker;
+    }
+
+    @Override
+    public String toString() {
+        return "Tracking " + task;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index e15689c86..49ae36ceb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
 import lombok.NonNull;
 
 import java.net.URL;
@@ -29,7 +29,7 @@ import java.util.Set;
 public abstract class AbstractTask implements Task {
     private static final long serialVersionUID = -2524701323779523718L;
 
-    protected OperationService operationService;
+    protected TaskExecutionContext executionContext;
     protected long taskID;
 
     protected Progress progress;
@@ -42,8 +42,8 @@ public abstract class AbstractTask implements Task {
     public abstract Set<URL> getJarsUrl();
 
     @Override
-    public void setOperationService(OperationService operationService) {
-        this.operationService = operationService;
+    public void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
+        this.executionContext = taskExecutionContext;
     }
 
     @NonNull
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
index 319bc4ec6..ab206b0b7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
@@ -48,7 +48,7 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> impleme
 
     @Override
     public void assignSplit(int subtaskId, List<SplitT> splits) {
-        task.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberID(subtaskId));
+        task.executionContext.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberAddr(subtaskId));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b5c381999..44c48aa26 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -59,12 +59,12 @@ public class SeaTunnelTask extends AbstractTask {
 
     private void register() {
         if (startFromSource()) {
-            sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+            this.executionContext.sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
         }
     }
 
     private void requestSplit() {
-        sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
+        this.executionContext.sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
     }
 
     private boolean startFromSource() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 4605d52f9..8fd425b31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -21,13 +21,14 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
 
+import com.hazelcast.cluster.Address;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
 
@@ -36,7 +37,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private final PhysicalSourceAction<?, SplitT, ?> source;
     private SourceSplitEnumerator<?, ?> enumerator;
     private SeaTunnelSplitEnumeratorContext<SplitT> context;
-    private Map<Integer, UUID> taskMemberMapping;
+    private Map<Integer, Address> taskMemberMapping;
 
     @Override
     public void init() throws Exception {
@@ -58,8 +59,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         this.source = source;
     }
 
-    private void receivedReader(int readerId, UUID memberID) {
-        this.addTaskMemberMapping(readerId, memberID);
+    private void receivedReader(int readerId, Address memberAddr) {
+        this.addTaskMemberMapping(readerId, memberAddr);
         enumerator.registerReader(readerId);
     }
 
@@ -67,11 +68,11 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         enumerator.handleSplitRequest(taskID);
     }
 
-    private void addTaskMemberMapping(int taskID, UUID memberID) {
-        taskMemberMapping.put(taskID, memberID);
+    private void addTaskMemberMapping(int taskID, Address memberAddr) {
+        taskMemberMapping.put(taskID, memberAddr);
     }
 
-    public UUID getTaskMemberID(int taskID) {
+    public Address getTaskMemberAddr(int taskID) {
         return taskMemberMapping.get(taskID);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
index 5c26c3a39..334f0152e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -28,6 +28,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * For {@link org.apache.seatunnel.api.source.SourceReader} to register with
@@ -50,8 +51,7 @@ public class RegisterOperation extends Operation implements IdentifiedDataSerial
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         UUID readerUUID = getCallerUuid();
-        TaskExecutionContext executionContext =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+        ConcurrentMap<Long, TaskExecutionContext> executionContextMap = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
         // TODO register reader to enumerator
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java
deleted file mode 100644
index 014e62efd..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package execution;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.logging.ILogger;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-
-
-public class TaskExecutionServiceTest {
-
-    HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext())).getOriginal();
-    SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-    ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
-
-
-    @Test
-    public void testAll() throws InterruptedException {
-        testCancel();
-        testFinish();
-    }
-
-    public void testCancel() throws InterruptedException {
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
-
-        AtomicBoolean stop = new AtomicBoolean(false);
-        TestTask testTask = new TestTask(stop, logger);
-
-        TaskExecutionContext taskExecutionContext = taskExecutionService.submitTask(testTask);
-
-        Thread.sleep(3000);
-
-        taskExecutionContext.cancel();
-
-        Thread.sleep(30000);
-        assertTrue(taskExecutionContext.executionFuture.isCompletedExceptionally());
-    }
-
-    public void testFinish() throws InterruptedException {
-        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
-
-        AtomicBoolean stop = new AtomicBoolean(false);
-        AtomicBoolean futureMark = new AtomicBoolean(false);
-        TestTask testTasklet = new TestTask(stop, logger);
-
-        TaskExecutionContext taskExecutionContext = taskExecutionService.submitTask(testTasklet);
-        taskExecutionContext.executionFuture.whenComplete(new BiConsumer<Void, Throwable>() {
-            @Override
-            public void accept(Void unused, Throwable throwable) {
-                futureMark.set(true);
-            }
-        });
-
-        Thread.sleep(3000);
-
-        stop.set(true);
-
-        Thread.sleep(1000);
-
-        assertTrue(taskExecutionContext.executionFuture.isDone());
-        assertTrue(futureMark.get());
-    }
-}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
new file mode 100644
index 000000000..0bf524cb6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.CANCELED;
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.FAILED;
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.FINISHED;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.logging.ILogger;
+import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
+import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
+import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
+import org.apache.seatunnel.engine.server.execution.TestTask;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+
+public class TaskExecutionServiceTest {
+
+    HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+    SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+    ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
+
+    long taskRunTime = 2000;
+
+    @Test
+    public void testAll() throws InterruptedException, ExecutionException {
+        logger.info("----------start Cancel test----------");
+        testCancel();
+
+        logger.info("----------start Finish test----------");
+        testFinish();
+
+        logger.info("----------start Delay test----------");
+        testDelay();
+        testDelay();
+
+        logger.info("----------start ThrowException test----------");
+        testThrowException();
+
+        logger.info("----------start CriticalCallTime test----------");
+        testCriticalCallTime();
+
+    }
+
+    public void testCancel() {
+        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+        long sleepTime = 300;
+
+        AtomicBoolean stop = new AtomicBoolean(false);
+        TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
+        TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
+
+        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+
+        cancellationFuture.cancel(true);
+
+        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(CANCELED, completableFuture.get().getExecutionState());
+        });
+    }
+
+    public void testFinish() {
+        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+        long sleepTime = 300;
+
+        AtomicBoolean stop = new AtomicBoolean(false);
+        AtomicBoolean futureMark = new AtomicBoolean(false);
+        TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
+        TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
+
+        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
+            @Override
+            public void accept(TaskExecutionState unused, Throwable throwable) {
+                futureMark.set(true);
+            }
+        });
+
+        stop.set(true);
+
+        await().atMost(sleepTime + 100, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(FINISHED, completableFuture.get().getExecutionState());
+        });
+
+        assertTrue(futureMark.get());
+    }
+
+
+    /**
+     * Test task execution time is the same as the timer timeout
+     */
+    public void testCriticalCallTime() throws InterruptedException {
+        AtomicBoolean stopMark = new AtomicBoolean(false);
+        CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
+
+        int count = 100;
+
+        //Must be the same as the timer timeout
+        int callTime = 50;
+
+        //Create tasks with critical delays
+        List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
+
+        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
+
+        // Run it for a while
+        Thread.sleep(taskRunTime);
+
+        //stop task
+        stopMark.set(true);
+
+        // Check all task ends right
+        await().atMost(count * callTime, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(FINISHED, taskCts.get().getExecutionState());
+        });
+
+        //Check that each Task is only Done once
+        assertEquals(count, stopTime.size());
+
+    }
+
+    public void testThrowException() throws InterruptedException {
+        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+        AtomicBoolean stopMark = new AtomicBoolean(false);
+
+        long t1Sleep = 100;
+        long t2Sleep = 50;
+
+        long lowLagSleep = 50;
+        long highLagSleep = 300;
+
+        List<Throwable> t1throwable = new ArrayList<>();
+        ExceptionTestTask t1 = new ExceptionTestTask(t1Sleep, "t1", t1throwable);
+
+        List<Throwable> t2throwable = new ArrayList<>();
+        ExceptionTestTask t2 = new ExceptionTestTask(t2Sleep, "t2", t2throwable);
+
+        //Create low lat tasks
+        List<Task> lowLagTask = buildFixedTestTask(lowLagSleep, 10, stopMark, new CopyOnWriteArrayList<>());
+
+        //Create high lat tasks
+        List<Task> highLagTask = buildFixedTestTask(highLagSleep, 5, stopMark, new CopyOnWriteArrayList<>());
+
+        List<Task> tasks = new ArrayList<>();
+        tasks.addAll(highLagTask);
+        tasks.addAll(lowLagTask);
+        Collections.shuffle(tasks);
+
+
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
+
+
+        CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
+
+        CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroup("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
+
+        Thread.sleep(taskRunTime);
+
+        t1throwable.add(new IOException());
+        t2throwable.add(new IOException());
+
+        await().atMost(t1Sleep + t2Sleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(FAILED, t1c.get().getExecutionState());
+            assertEquals(FAILED, t2c.get().getExecutionState());
+        });
+
+        stopMark.set(true);
+
+        await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(FINISHED, taskCts.get().getExecutionState());
+        });
+    }
+
+    public void testDelay() throws InterruptedException {
+
+        long lowLagSleep = 10;
+        long highLagSleep = 300;
+
+        AtomicBoolean stopMark = new AtomicBoolean(false);
+
+        CopyOnWriteArrayList<Long> lowLagList = new CopyOnWriteArrayList<>();
+        CopyOnWriteArrayList<Long> highLagList = new CopyOnWriteArrayList<>();
+
+        //Create low lat tasks
+        List<Task> lowLagTask = buildFixedTestTask(lowLagSleep, 10, stopMark, lowLagList);
+
+        //Create high lat tasks
+        List<Task> highLagTask = buildFixedTestTask(highLagSleep, 5, stopMark, highLagList);
+
+        List<Task> tasks = new ArrayList<>();
+        tasks.addAll(highLagTask);
+        tasks.addAll(lowLagTask);
+        Collections.shuffle(tasks);
+
+        TaskGroup taskGroup = new TaskGroup("ts", Lists.newArrayList(tasks));
+
+
+        logger.info("task size is : " + taskGroup.getTasks().size());
+
+        TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(taskGroup, cancellationFuture);
+
+        //stop tasks
+        Thread.sleep(taskRunTime);
+        stopMark.set(true);
+
+        //Check all task ends right
+        await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+            assertEquals(FINISHED, completableFuture.get().getExecutionState());
+        });
+
+        //Computation Delay
+        double lowAvg = lowLagList.stream().mapToLong(x -> x).average().getAsDouble();
+        double highAvg = highLagList.stream().mapToLong(x -> x).average().getAsDouble();
+
+        assertTrue(lowAvg < 400);
+
+        logger.info("lowAvg : " + lowAvg);
+        logger.info("highAvg : " + highAvg);
+
+    }
+
+    public List<Task> buildFixedTestTask(long callTime, long count, AtomicBoolean stopMart, CopyOnWriteArrayList<Long> lagList) {
+        List<Task> taskQueue = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            taskQueue.add(new FixedCallTestTimeTask(callTime, callTime + "t" + i, stopMart, lagList));
+        }
+        return taskQueue;
+    }
+
+    public List<Task> buildStopTestTask(long callTime, long count, AtomicBoolean stopMart, CopyOnWriteArrayList<Long> stopList) {
+        List<Task> taskQueue = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            taskQueue.add(new StopTimeTestTask(callTime, stopList, stopMart));
+        }
+        return taskQueue;
+    }
+
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
index c0123277c..6815e29e2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
@@ -17,14 +17,33 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.Data;
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+import lombok.SneakyThrows;
 
-import java.io.Serializable;
-import java.util.Collection;
+import java.util.List;
 
-@Data
-public class TaskGroup implements Serializable {
-    private final String taskGroupName;
+@AllArgsConstructor
+public class ExceptionTestTask implements Task {
+    long callTime;
+    String name;
+    List<Throwable> throwE;
 
-    private final Collection<Task> tasks;
+    @SneakyThrows
+    @NonNull
+    @Override
+    public ProgressState call() {
+        if(!throwE.isEmpty()){
+            throw throwE.get(0);
+        }else {
+            Thread.sleep(callTime);
+        }
+        return ProgressState.MADE_PROGRESS;
+    }
+
+    @NonNull
+    @Override
+    public Long getTaskID() {
+        return (long) this.hashCode();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
index 21ae87bfe..d3022287b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
@@ -15,49 +15,54 @@
  * limitations under the License.
  */
 
-package execution;
+package org.apache.seatunnel.engine.server.execution;
 
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
-
-import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-/**
- * For test use, only print logs
- */
-public class TestTask implements Task {
-
+public class FixedCallTestTimeTask implements Task {
+    long callTime;
+    String name;
+    long currentTime;
+    CopyOnWriteArrayList<Long> lagList;
     AtomicBoolean stop;
-    private final ILogger logger;
 
-    public TestTask(AtomicBoolean stop, ILogger logger){
+    public FixedCallTestTimeTask(long callTime, String name, AtomicBoolean stop, CopyOnWriteArrayList<Long> lagList){
+        this.callTime = callTime;
+        this.name = name;
         this.stop = stop;
-        this.logger = logger;
+        this.lagList = lagList;
     }
 
     @NonNull
     @Override
     public ProgressState call() {
-        ProgressState progressState;
-        if (!stop.get()){
-            logger.info("TestTasklet is running");
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-            }
-            progressState = ProgressState.MADE_PROGRESS;
-        }else {
-            progressState = ProgressState.DONE;
+        if(currentTime != 0){
+            lagList.add(System.currentTimeMillis() - currentTime);
+        }
+        currentTime = System.currentTimeMillis();
+
+        try {
+            Thread.sleep(callTime);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e.toString());
         }
-        return progressState;
+        if(stop.get()){
+            return ProgressState.DONE;
+        }
+        return ProgressState.MADE_PROGRESS;
     }
 
     @NonNull
     @Override
     public Long getTaskID() {
-        return 1L;
+        return (long) this.hashCode();
+    }
+
+    @Override
+    public boolean isThreadsShare() {
+        return true;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
index 21ae87bfe..5bc1bb50c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
@@ -15,49 +15,43 @@
  * limitations under the License.
  */
 
-package execution;
+package org.apache.seatunnel.engine.server.execution;
 
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
-
-import com.hazelcast.logging.ILogger;
+import lombok.AllArgsConstructor;
 import lombok.NonNull;
 
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-/**
- * For test use, only print logs
- */
-public class TestTask implements Task {
-
+@AllArgsConstructor
+public class StopTimeTestTask implements Task {
+    long callTime;
+    CopyOnWriteArrayList<Long> stopList;
     AtomicBoolean stop;
-    private final ILogger logger;
-
-    public TestTask(AtomicBoolean stop, ILogger logger){
-        this.stop = stop;
-        this.logger = logger;
-    }
 
     @NonNull
     @Override
     public ProgressState call() {
-        ProgressState progressState;
-        if (!stop.get()){
-            logger.info("TestTasklet is running");
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-            }
-            progressState = ProgressState.MADE_PROGRESS;
-        }else {
-            progressState = ProgressState.DONE;
+        try {
+            Thread.sleep(callTime);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e.toString());
         }
-        return progressState;
+        if(stop.get()){
+            stopList.add(Thread.currentThread().getId());
+            return ProgressState.DONE;
+        }
+        return ProgressState.MADE_PROGRESS;
     }
 
     @NonNull
     @Override
     public Long getTaskID() {
-        return 1L;
+        return (long) this.hashCode();
+    }
+
+    @Override
+    public boolean isThreadsShare() {
+        return Task.super.isThreadsShare();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
similarity index 76%
rename from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
index 21ae87bfe..8fbc6aa9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
@@ -15,10 +15,7 @@
  * limitations under the License.
  */
 
-package execution;
-
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
+package org.apache.seatunnel.engine.server.execution;
 
 import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
@@ -31,11 +28,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class TestTask implements Task {
 
     AtomicBoolean stop;
+    long sleep;
     private final ILogger logger;
+    boolean isThreadsShare;
 
-    public TestTask(AtomicBoolean stop, ILogger logger){
+    public TestTask(AtomicBoolean stop, ILogger logger, long sleep, boolean isThreadsShare){
         this.stop = stop;
         this.logger = logger;
+        this.sleep = sleep;
+        this.isThreadsShare = isThreadsShare;
     }
 
     @NonNull
@@ -43,9 +44,9 @@ public class TestTask implements Task {
     public ProgressState call() {
         ProgressState progressState;
         if (!stop.get()){
-            logger.info("TestTasklet is running");
+            logger.info("TestTask is running.........");
             try {
-                Thread.sleep(1000);
+                Thread.sleep(sleep);
             } catch (InterruptedException e) {
             }
             progressState = ProgressState.MADE_PROGRESS;
@@ -58,6 +59,11 @@ public class TestTask implements Task {
     @NonNull
     @Override
     public Long getTaskID() {
-        return 1L;
+        return (long) this.hashCode();
+    }
+
+    @Override
+    public boolean isThreadsShare() {
+        return isThreadsShare;
     }
 }