You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/09 05:39:50 UTC

[incubator-seatunnel] branch dev updated: [hotfix][ST-Engine] Fix Serializable error (#2997)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a84379259 [hotfix][ST-Engine] Fix Serializable error (#2997)
a84379259 is described below

commit a84379259a7ad60ffcf6c5801296b07867c0cc5d
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 9 13:39:45 2022 +0800

    [hotfix][ST-Engine] Fix Serializable error (#2997)
    
    * Fix ResourceProfile and Checkpoint DataSerializable error
    
    * fix CheckpointManager NPT
    
    * run initPluginDir before all test case
    
    * fix check style
    
    * improve test code
    
    * remove try catch in test
    
    * Rollback unnecessary modifications
    
    * Rollback unnecessary modifications
    
    * fix ci error
    
    * retry ci
    
    * retry ci
    
    * revert ignore update
    
    * fix check style
    
    * fix ci error
    
    * Update seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
    
    Co-authored-by: Hisoka <fa...@qq.com>
    
    * fix review problem
    
    * fix ci error
    
    Co-authored-by: Hisoka <fa...@qq.com>
---
 .../apache/seatunnel/common/utils/FileUtils.java   | 83 +++++++++++++++++++
 .../connector-seatunnel-e2e-base/pom.xml           | 20 ++++-
 .../engine/e2e/ClusterFaultToleranceIT.java        | 95 ++++++++++++++++++++++
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 80 +++++++-----------
 .../org/apache/seatunnel/engine/e2e/TestUtils.java | 85 +++++++------------
 .../test/resources/batch_fakesource_to_file.conf   | 52 +++++++++---
 .../batch_fakesource_to_file_complex.conf          | 92 ++++++++++++++++++---
 ... cluster_batch_fake_to_localfile_template.conf} | 53 +++++++++---
 .../streaming_fakesource_to_file_complex.conf      | 94 +++++++++++++++++----
 .../src/main/resources/hazelcast-client.yaml       | 14 ++++
 .../engine/server/SeaTunnelServerStarter.java      | 21 ++++-
 .../engine/server/TaskExecutionService.java        |  2 +-
 .../operation/NotifyTaskRestoreOperation.java      |  2 +-
 .../operation/TaskAcknowledgeOperation.java        |  4 +-
 .../operation/TaskReportStatusOperation.java       |  2 +
 .../resourcemanager/resource/SlotProfile.java      | 18 ++--
 .../resourcemanager/worker/WorkerProfile.java      | 15 ++--
 .../serializable/ResourceDataSerializerHook.java   | 11 ++-
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../server/service/slot/SlotAndWorkerProfile.java  | 38 ++++++++-
 .../operation/GetTaskGroupAddressOperation.java    |  6 +-
 .../operation/NotifyTaskStatusOperation.java       | 41 +++++++++-
 .../checkpoint/CloseRequestOperation.java          |  4 +-
 .../task/operation/sink/SinkRegisterOperation.java |  8 +-
 .../operation/source/AssignSplitOperation.java     |  4 +-
 .../operation/source/RequestSplitOperation.java    |  4 +-
 .../operation/source/SourceRegisterOperation.java  |  4 +-
 .../engine/server/AbstractSeaTunnelServerTest.java |  5 +-
 .../engine/server/CoordinatorServiceTest.java      | 14 ++--
 .../apache/seatunnel/engine/server/TestUtils.java  | 14 ----
 30 files changed, 661 insertions(+), 229 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
new file mode 100644
index 000000000..ae1dd1cf1
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.apache.seatunnel.common.ExceptionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+@Slf4j
+public class FileUtils {
+
+    public static String readFileToStr(Path path) {
+        try {
+            byte[] bytes = Files.readAllBytes(path);
+            return new String(bytes);
+        } catch (IOException e) {
+            log.error(ExceptionUtil.getMessage(e));
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void writeStringToFile(String filePath, String str) {
+        PrintStream ps = null;
+        try {
+            File file = new File(filePath);
+            ps = new PrintStream(new FileOutputStream(file));
+            ps.println(str);
+        } catch (FileNotFoundException e) {
+            log.error(ExceptionUtil.getMessage(e));
+            throw new RuntimeException(e);
+        } finally {
+            if (ps != null) {
+                ps.close();
+            }
+        }
+    }
+
+    public static void createParentFile(File file) {
+        File parentFile = file.getParentFile();
+        if (null != parentFile && !parentFile.exists()) {
+            parentFile.mkdirs();
+            createParentFile(parentFile);
+        }
+    }
+
+    /**
+     * create a new file, delete the old one if it is exists.
+     * @param filePath filePath
+     */
+    public static void createNewFile(String filePath) {
+        File file = new File(filePath);
+        if (file.exists()) {
+            file.delete();
+        }
+
+        if (!file.getParentFile().exists()) {
+            createParentFile(file);
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index ca51d8d6a..29077000c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -33,10 +33,28 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2</artifactId>
-            <scope>compile</scope>
+            <scope>test</scope>
             <exclusions>
                 <exclusion>
                     <artifactId>avro</artifactId>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
new file mode 100644
index 000000000..76785bb9c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
+ */
+public class ClusterFaultToleranceIT {
+
+    @Test
+    public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+        HazelcastInstanceImpl node1 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+        HazelcastInstanceImpl node2 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+        HazelcastInstanceImpl node3 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+        // waiting all node added to cluster
+        Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+
+        // TODO Need FakeSource support parallel first
+        Common.setDeployMode(DeployMode.CLIENT);
+        String targetJobConfigNamePrefix = "cluster_batch_fake_to_localfile_fine";
+        Map<String, String> valueMap = new HashMap<>();
+        valueMap.put("target_test_job_config_file_name", targetJobConfigNamePrefix);
+
+        String targetConfigFilePath =
+            File.separator + "tmp" + File.separator + "test_conf" + File.separator + targetJobConfigNamePrefix +
+                ".conf";
+        TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
+            targetConfigFilePath);
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("test_cluster_fault_worker_batch_job");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(
+            TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(targetConfigFilePath, jobConfig);
+        ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+            return clientJobProxy.waitForJobComplete();
+        });
+
+        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertTrue(
+                objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index ada5c4fac..c483c61b5 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -24,21 +24,16 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
-import com.google.common.collect.Lists;
 import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -46,19 +41,14 @@ import java.util.concurrent.TimeUnit;
 public class JobExecutionIT {
     @BeforeAll
     public static void beforeClass() throws Exception {
-        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
-        seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
-            Thread.currentThread().getName(),
-            new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+        SeaTunnelServerStarter.createHazelcastInstance(TestUtils.getClusterName("JobExecutionIT"));
     }
 
     @Test
     public void testSayHello() {
-        SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
-        seaTunnelClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
 
         String msg = "Hello world";
         String s = engineClient.printMessageToMaster(msg);
@@ -66,10 +56,9 @@ public class JobExecutionIT {
     }
 
     @Test
-    public void testExecuteJob() throws IOException {
-        TestUtils.initPluginDir();
+    public void testExecuteJob() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
+        String filePath = TestUtils.getResource("batch_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
 
@@ -78,27 +67,22 @@ public class JobExecutionIT {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        try {
-            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
-            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
-                return clientJobProxy.waitForJobComplete();
-            });
+        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+            return clientJobProxy.waitForJobComplete();
+        });
 
-            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(
-                    objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertTrue(
+                objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
 
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @Test
-    public void cancelJobTest() throws IOException {
-        TestUtils.initPluginDir();
+    public void cancelJobTest() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("streaming_fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
 
@@ -107,23 +91,19 @@ public class JobExecutionIT {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        try {
-            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-            JobStatus jobStatus1 = clientJobProxy.getJobStatus();
-            Assertions.assertFalse(jobStatus1.isEndState());
-            ClientJobProxy finalClientJobProxy = clientJobProxy;
-            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
-                return finalClientJobProxy.waitForJobComplete();
-            });
-            Thread.sleep(1000);
-            clientJobProxy.cancelJob();
-
-            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(
-                    objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+        JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+        Assertions.assertFalse(jobStatus1.isEndState());
+        ClientJobProxy finalClientJobProxy = clientJobProxy;
+        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+            return finalClientJobProxy.waitForJobComplete();
+        });
+        Thread.sleep(1000);
+        clientJobProxy.cancelJob();
+
+        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertTrue(
+                objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
     }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index e115b5af8..b9e13e3bc 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -17,71 +17,42 @@
 
 package org.apache.seatunnel.engine.e2e;
 
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.Map;
 
+@Slf4j
 public class TestUtils {
     public static String getResource(String confFile) {
-        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+        return System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + File.separator +
+            "resources" + File.separator + confFile;
     }
 
-    public static String getClusterName(String testClassName) {
-        return System.getProperty("user.name") + "_" + testClassName;
+    /**
+     * For reduce the config files num, we can define a job config template and then create new job config file base on it.
+     *
+     * @param templateFile   The basic job configuration file, which often contains some content that needs to be replaced
+     *                       at runtime, generates a new final job configuration file for testing after replacement
+     * @param valueMap       replace kv
+     * @param targetFilePath The new config file path
+     */
+    public static void createTestConfigFileFromTemplate(@NonNull String templateFile,
+                                                        @NonNull Map<String, String> valueMap,
+                                                        @NonNull String targetFilePath) {
+        String templateFilePath = getResource(templateFile);
+        String confContent = FileUtils.readFileToStr(Paths.get(templateFilePath));
+        String targetConfContent = VariablesSubstitute.substitute(confContent, valueMap);
+        FileUtils.createNewFile(targetFilePath);
+        FileUtils.writeStringToFile(targetFilePath, targetConfContent);
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static void initPluginDir() throws IOException {
-        // TODO change connector get method
-        // copy connectors to project_root/connectors dir
-        System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
-            String.format("%s..%s..%s..%s", File.separator, File.separator, File.separator, File.separator));
-        File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));
-
-        File connectorDir = new File(seatunnelRootDir +
-            File.separator +
-            "connectors/seatunnel");
-
-        if (connectorDir.exists()) {
-            connectorDir.delete();
-        }
-
-        connectorDir.mkdirs();
-
-        File connectorDistDir = new File(
-            seatunnelRootDir +
-                File.separator +
-                "seatunnel-connectors-v2");
-        try (Stream<Path> paths = Files.walk(connectorDistDir.toPath(), 6, FileVisitOption.FOLLOW_LINKS)) {
-            paths.filter(path -> path.getFileName().toFile().getName().startsWith("connector-") && path.getFileName().toFile().getName().endsWith(".jar") && !path.getFileName().toString().contains("javadoc"))
-                .collect(Collectors.toSet()).forEach(file -> {
-                    Path copied = Paths.get(connectorDir + File.separator + file.toFile().getName());
-                    Path originalPath = file.toAbsolutePath();
-                    try {
-                        Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                });
-        }
-
-        Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
-            File.separator +
-            "connectors" +
-            File.separator +
-            "plugin-mapping.properties");
-
-        Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties");
-        try {
-            Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+    public static String getClusterName(String testClassName) {
+        return System.getProperty("user.name") + "_" + testClassName;
     }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 8c78ced4c..4431bf67c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -20,7 +20,6 @@
 
 env {
   # You can set flink configuration here
-  execution.parallelism = 1
   job.mode = "BATCH"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,16 +28,50 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
+      parallelism = 1
       result_table_name = "fake"
-      schema {
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
-
 }
 
 transform {
@@ -46,19 +79,16 @@ transform {
 
 sink {
   LocalFile {
-    path="/tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test1"
     field_delimiter="\t"
     row_delimiter="\n"
-    partition_by=["age"]
+    partition_by=["c_string"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
     file_format="text"
-    sink_columns=["name","age"]
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error"
-
   }
-
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index ddc450819..8d8afe530 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -20,7 +20,6 @@
 
 env {
   # You can set flink configuration here
-  execution.parallelism = 1
   job.mode = "BATCH"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,27 +28,96 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
+      parallelism = 1
       result_table_name = "fake"
-      schema {
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
 
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
       result_table_name = "fake"
-      schema {
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
-
 }
 
 transform {
@@ -60,16 +128,14 @@ sink {
     path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
-    partition_by=["age"]
+    partition_by=["c_string"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
     file_format="text"
-    sink_columns=["name","age"]
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
     source_table_name="fake"
   }
-
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
similarity index 55%
copy from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 8c78ced4c..bd843c6dd 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -20,7 +20,6 @@
 
 env {
   # You can set flink configuration here
-  execution.parallelism = 1
   job.mode = "BATCH"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,16 +28,49 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
-      result_table_name = "fake"
-      schema {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
+      parallelism = 1
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
-
 }
 
 transform {
@@ -46,19 +78,16 @@ transform {
 
 sink {
   LocalFile {
-    path="/tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/${target_test_job_config_file_name}" # target_test_job_config_file_name will be replace to the final file name before test run
     field_delimiter="\t"
     row_delimiter="\n"
-    partition_by=["age"]
+    partition_by=["c_string"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
     file_format="text"
-    sink_columns=["name","age"]
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error"
-
   }
-
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index d454ac791..f2c8e5fee 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -20,7 +20,6 @@
 
 env {
   # You can set flink configuration here
-  execution.parallelism = 1
   job.mode = "STREAMING"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,27 +28,96 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
       result_table_name = "fake"
-      schema {
+      parallelism = 1
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
 
     FakeSource {
+      row.num = 10
+      map.size = 10
+      array.size = 10
+      bytes.length = 10
+      string.length = 10
       result_table_name = "fake"
-      schema {
+      parallelism = 1
+      schema = {
         fields {
-          name = "string"
-          age = "int"
+          c_map = "map<string, array<int>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, map<string, string>>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_null = "null"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
         }
       }
-      parallelism = 1
     }
-
 }
 
 transform {
@@ -57,19 +125,17 @@ transform {
 
 sink {
   LocalFile {
-    path="/tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test3"
     field_delimiter="\t"
     row_delimiter="\n"
-    partition_by=["age"]
+    partition_by=["c_string"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
     file_format="text"
-    sink_columns=["name","age"]
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
     source_table_name="fake"
   }
-
 }
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
index a5fe3bc42..9552c382e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
@@ -21,3 +21,17 @@ hazelcast-client:
   network:
     cluster-members:
       - localhost:5801
+      - localhost:5802
+      - localhost:5803
+      - localhost:5804
+      - localhost:5805
+      - localhost:5806
+      - localhost:5807
+      - localhost:5808
+      - localhost:5809
+      - localhost:5810
+      - localhost:5811
+      - localhost:5812
+      - localhost:5813
+      - localhost:5814
+      - localhost:5815
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 0104c1e55..447a7fe7c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -21,12 +21,29 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 
 public class SeaTunnelServerStarter {
 
     public static void main(String[] args) {
+        createHazelcastInstance();
+    }
+
+    public static HazelcastInstanceImpl createHazelcastInstance(String clusterName) {
+        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+        return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
+            seaTunnelConfig.getHazelcastConfig(),
+            HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
+            new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+    }
+
+    public static HazelcastInstanceImpl createHazelcastInstance() {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
-        HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
-            Thread.currentThread().getName(), new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+        return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
+            seaTunnelConfig.getHazelcastConfig(),
+            HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
+            new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index d2d8b67d6..6f88f89dc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -38,8 +38,8 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.internal.serialization.Data;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 9bed053ff..ec282e037 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -71,7 +71,7 @@ public class NotifyTaskRestoreOperation extends TaskOperation {
         int size = in.readInt();
         this.restoredState = new ArrayList<>(size);
         for (int i = 0; i < size; i++) {
-            restoredState.add(in.readObject(ActionSubtaskState.class));
+            restoredState.add(in.readObject());
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
index 64e609048..5a5e16c54 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
@@ -65,8 +65,8 @@ public class TaskAcknowledgeOperation extends Operation implements IdentifiedDat
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        taskLocation = in.readObject(TaskLocation.class);
-        barrier = in.readObject(CheckpointBarrier.class);
+        taskLocation = in.readObject();
+        barrier = in.readObject();
         states = in.readObject();
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index e515387c4..d5ea0d078 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -53,11 +53,13 @@ public class TaskReportStatusOperation extends Operation implements IdentifiedDa
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         out.writeObject(location);
+        out.writeObject(status);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         location = in.readObject(TaskLocation.class);
+        status = in.readObject();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index 9e4794dbf..b7a53de85 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -31,7 +31,7 @@ import java.io.IOException;
  */
 public class SlotProfile implements IdentifiedDataSerializable {
 
-    private final Address worker;
+    private Address worker;
 
     private int slotID;
 
@@ -83,12 +83,12 @@ public class SlotProfile implements IdentifiedDataSerializable {
     @Override
     public String toString() {
         return "SlotProfile{" +
-                "worker=" + worker +
-                ", slotID=" + slotID +
-                ", ownerJobID=" + ownerJobID +
-                ", assigned=" + assigned +
-                ", resourceProfile=" + resourceProfile +
-                '}';
+            "worker=" + worker +
+            ", slotID=" + slotID +
+            ", ownerJobID=" + ownerJobID +
+            ", assigned=" + assigned +
+            ", resourceProfile=" + resourceProfile +
+            '}';
     }
 
     @Override
@@ -103,7 +103,7 @@ public class SlotProfile implements IdentifiedDataSerializable {
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
-        worker.writeData(out);
+        out.writeObject(worker);
         out.writeInt(slotID);
         out.writeLong(ownerJobID);
         out.writeBoolean(assigned);
@@ -112,7 +112,7 @@ public class SlotProfile implements IdentifiedDataSerializable {
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        worker.readData(in);
+        worker = in.readObject();
         slotID = in.readInt();
         ownerJobID = in.readLong();
         assigned = in.readBoolean();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index fb83a7dfd..e8a73d338 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -66,36 +66,33 @@ public class WorkerProfile implements IdentifiedDataSerializable {
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
-        address.writeData(out);
+        out.writeObject(address);
         out.writeObject(profile);
         out.writeObject(unassignedResource);
         out.writeInt(assignedSlots.length);
         for (SlotProfile assignedSlot : assignedSlots) {
-            assignedSlot.writeData(out);
+            out.writeObject(assignedSlot);
         }
         out.writeInt(unassignedSlots.length);
         for (SlotProfile unassignedSlot : unassignedSlots) {
-            unassignedSlot.writeData(out);
+            out.writeObject(unassignedSlot);
         }
-        out.writeObject(unassignedSlots);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        address.readData(in);
+        address = in.readObject();
         profile = in.readObject();
         unassignedResource = in.readObject();
         int assignedSlotsLength = in.readInt();
         assignedSlots = new SlotProfile[assignedSlotsLength];
         for (int i = 0; i < assignedSlots.length; i++) {
-            assignedSlots[i] = new SlotProfile();
-            assignedSlots[i].readData(in);
+            assignedSlots[i] = in.readObject();
         }
         int unassignedSlotsLength = in.readInt();
         unassignedSlots = new SlotProfile[unassignedSlotsLength];
         for (int i = 0; i < unassignedSlots.length; i++) {
-            unassignedSlots[i] = new SlotProfile();
-            unassignedSlots[i].readData(in);
+            unassignedSlots[i] = in.readObject();
         }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 4ec2e16a5..28710284c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourc
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -45,9 +46,11 @@ public class ResourceDataSerializerHook implements DataSerializerHook {
 
     public static final int SLOT_PROFILE_TYPE = 6;
 
+    public static final int SLOT_AND_WORKER_PROFILE = 7;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
-            SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
-            SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
+        SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
+        SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
     );
 
     @Override
@@ -75,8 +78,10 @@ public class ResourceDataSerializerHook implements DataSerializerHook {
                     return new ResetResourceOperation();
                 case WORKER_PROFILE_TYPE:
                     return new WorkerProfile();
-                case  SLOT_PROFILE_TYPE:
+                case SLOT_PROFILE_TYPE:
                     return new SlotProfile();
+                case SLOT_AND_WORKER_PROFILE:
+                    return new SlotAndWorkerProfile();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 828de27ae..19c2d7114 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
+import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
@@ -66,6 +67,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
 
     public static final int RESTORED_SPLIT_OPERATOR = 14;
 
+    public static final int NOTIFY_TASK_STATUS_OPERATOR = 15;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -114,6 +117,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                     return new GetTaskGroupAddressOperation();
                 case RESTORED_SPLIT_OPERATOR:
                     return new RestoredSplitOperation();
+                case NOTIFY_TASK_STATUS_OPERATOR:
+                    return new NotifyTaskStatusOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index 2523e79d2..402b79656 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -19,14 +19,22 @@ package org.apache.seatunnel.engine.server.service.slot;
 
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 
-import java.io.Serializable;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
-public class SlotAndWorkerProfile implements Serializable {
+import java.io.IOException;
 
-    private final WorkerProfile workerProfile;
+public class SlotAndWorkerProfile implements IdentifiedDataSerializable {
 
-    private final SlotProfile slotProfile;
+    private WorkerProfile workerProfile;
+
+    private SlotProfile slotProfile;
+
+    public SlotAndWorkerProfile() {
+    }
 
     public SlotAndWorkerProfile(WorkerProfile workerProfile, SlotProfile slotProfile) {
         this.workerProfile = workerProfile;
@@ -43,4 +51,26 @@ public class SlotAndWorkerProfile implements Serializable {
     public SlotProfile getSlotProfile() {
         return slotProfile;
     }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.SLOT_AND_WORKER_PROFILE;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeObject(workerProfile);
+        out.writeObject(slotProfile);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        workerProfile = in.readObject();
+        slotProfile = in.readObject();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 6a788c3e1..7f69773b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -66,13 +66,13 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        taskLocation.writeData(out);
+        out.writeObject(taskLocation);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        taskLocation.readData(in);
+        taskLocation = in.readObject();
     }
 
     @Override
@@ -82,6 +82,6 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
 
     @Override
     public int getClassId() {
-        return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
+        return TaskDataSerializerHook.GET_TASKGROUP_ADDRESS_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
similarity index 56%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
index 501dc468d..85216cb74 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
@@ -15,18 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.operation;
+package org.apache.seatunnel.engine.server.task.operation;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class NotifyTaskStatusOperation extends Operation {
+import java.io.IOException;
 
-    private final TaskGroupLocation taskGroupLocation;
-    private final TaskExecutionState taskExecutionState;
+public class NotifyTaskStatusOperation extends Operation implements IdentifiedDataSerializable {
+
+    private TaskGroupLocation taskGroupLocation;
+    private TaskExecutionState taskExecutionState;
+
+    public NotifyTaskStatusOperation() {
+    }
 
     public NotifyTaskStatusOperation(TaskGroupLocation taskGroupLocation, TaskExecutionState taskExecutionState) {
         super();
@@ -34,6 +43,30 @@ public class NotifyTaskStatusOperation extends Operation {
         this.taskExecutionState = taskExecutionState;
     }
 
+    @Override
+    public final int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.NOTIFY_TASK_STATUS_OPERATOR;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(taskGroupLocation);
+        out.writeObject(taskExecutionState);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        taskGroupLocation = in.readObject();
+        taskExecutionState = in.readObject();
+    }
+
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index 859403b66..fd46082ab 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -62,13 +62,13 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        readerLocation.writeData(out);
+        out.writeObject(readerLocation);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        readerLocation.readData(in);
+        readerLocation = in.readObject();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index 1a326571d..77352339b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -68,15 +68,15 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        writerTaskID.writeData(out);
-        committerTaskID.writeData(out);
+        out.writeObject(writerTaskID);
+        out.writeObject(committerTaskID);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        writerTaskID.readData(in);
-        committerTaskID.readData(in);
+        writerTaskID = in.readObject();
+        committerTaskID = in.readObject();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 3eb20042b..7f42e4aed 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -65,13 +65,13 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         out.writeByteArray(splits);
-        taskID.writeData(out);
+        out.writeObject(taskID);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         splits = in.readByteArray();
-        taskID.readData(in);
+        taskID = in.readObject();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 13536215d..db7803c1d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -66,8 +66,8 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        taskID.writeData(out);
-        enumeratorTaskID.writeData(out);
+        out.writeObject(taskID);
+        out.writeObject(enumeratorTaskID);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 005cda41e..fef29aa57 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -80,8 +80,8 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        readerTaskID.readData(in);
-        enumeratorTaskID.readData(in);
+        readerTaskID = in.readObject();
+        enumeratorTaskID = in.readObject();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 1d1e0105b..403bbd128 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -36,8 +36,9 @@ public abstract class AbstractSeaTunnelServerTest {
     protected static ILogger LOGGER;
 
     @BeforeAll
-    public void before() {
-        instance = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
+    public  void before() {
+        instance = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + System.currentTimeMillis()));
         nodeEngine = instance.node.nodeEngine;
         server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
         LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 3de0be829..1397206fe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -39,8 +39,10 @@ import java.util.concurrent.TimeUnit;
 public class CoordinatorServiceTest {
     @Test
     public void testMasterNodeActive() {
-        HazelcastInstanceImpl instance1 = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testMasterNodeActive");
-        HazelcastInstanceImpl instance2 = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testMasterNodeActive");
+        HazelcastInstanceImpl instance1 = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("CoordinatorServiceTest_testMasterNodeActive"));
+        HazelcastInstanceImpl instance2 = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("CoordinatorServiceTest_testMasterNodeActive"));
 
         SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
         SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
@@ -57,7 +59,7 @@ public class CoordinatorServiceTest {
 
         // shutdown instance1
         instance1.shutdown();
-        await().atMost(10000, TimeUnit.MILLISECONDS)
+        await().atMost(20000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> {
                 try {
                     CoordinatorService coordinatorService = server2.getCoordinatorService();
@@ -75,7 +77,8 @@ public class CoordinatorServiceTest {
     @Test
     public void testClearCoordinatorService()
         throws MalformedURLException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-        HazelcastInstanceImpl coordinatorServiceTest = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testClearCoordinatorService");
+        HazelcastInstanceImpl coordinatorServiceTest = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("CoordinatorServiceTest_testClearCoordinatorService"));
         SeaTunnelServer server1 = coordinatorServiceTest.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
         CoordinatorService coordinatorService = server1.getCoordinatorService();
         Assertions.assertTrue(coordinatorService.isCoordinatorActive());
@@ -85,7 +88,8 @@ public class CoordinatorServiceTest {
             TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "test_clear_coordinator_service", jobId);
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
-            coordinatorServiceTest.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(), Collections.emptyList());
+            coordinatorServiceTest.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+            Collections.emptyList());
 
         Data data = coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 213cc725e..c9f802489 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -36,9 +34,6 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.parse.JobConfigParser;
 
 import com.google.common.collect.Sets;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.net.MalformedURLException;
@@ -82,15 +77,6 @@ public class TestUtils {
         return System.getProperty("user.name") + "_" + testClassName;
     }
 
-    public static HazelcastInstanceImpl createHazelcastInstance(String clusterName) {
-        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
-        seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(clusterName));
-        return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
-            seaTunnelConfig.getHazelcastConfig(),
-            HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
-            new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
-    }
-
     public static LogicalDag createTestLogicalPlan(String jobConfigFile, String jobName, Long jobId) {
         Common.setDeployMode(DeployMode.CLIENT);
         JobContext jobContext = new JobContext(jobId);