You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/10/08 15:24:37 UTC

[incubator-seatunnel] branch dev updated: [Engine] [Test] Fix engine e2e problem (#3019)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5f098a06e [Engine] [Test] Fix engine e2e problem (#3019)
5f098a06e is described below

commit 5f098a06ec85ad4100598c4b5793ce8d0591f015
Author: Hisoka <fa...@qq.com>
AuthorDate: Sat Oct 8 23:24:31 2022 +0800

    [Engine] [Test] Fix engine e2e problem (#3019)
    
    * [Engine] [Test] Fix engine e2e problem
    
    * [Engine] [Test] Fix engine e2e problem
---
 .../container/seatunnel/SeaTunnelContainer.java    | 123 +++++++++++++++++++++
 .../engine/e2e/console/FakeSourceToConsoleIT.java  |   2 -
 .../src/test/resources/fakesource_to_console.conf  |   8 +-
 .../connector-seatunnel-e2e-base/pom.xml           |  14 +++
 .../seatunnel/engine/e2e/JobExecutionIT.java       |   7 +-
 .../seatunnel/engine/e2e/SeaTunnelContainer.java   |   7 +-
 .../org/apache/seatunnel/engine/e2e/TestUtils.java |  37 +++----
 .../test/resources/batch_fakesource_to_file.conf   |   9 +-
 .../batch_fakesource_to_file_complex.conf          |  18 ++-
 .../streaming_fakesource_to_file_complex.conf      |  14 ++-
 seatunnel-e2e/seatunnel-engine-e2e/pom.xml         |   7 ++
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  12 +-
 .../engine/server/AbstractSeaTunnelServerTest.java |  24 ++--
 .../engine/server/TaskExecutionServiceTest.java    |  16 +--
 .../server/checkpoint/CheckpointPlanTest.java      |  10 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |  16 +--
 .../engine/server/master/JobMasterTest.java        |  24 ++--
 .../resourcemanager/ResourceManagerTest.java       |  20 ++--
 18 files changed, 271 insertions(+), 97 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
new file mode 100644
index 000000000..1ce6fb22d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.seatunnel;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+@NoArgsConstructor
+//@AutoService(TestContainer.class)
+// TODO add AutoService after engine feature is ready
+public class SeaTunnelContainer extends AbstractTestContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelContainer.class);
+    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+    protected static final Network NETWORK = Network.newNetwork();
+
+    private static final String SEATUNNEL_HOME = "/tmp/seatunnel";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+    private static final String CLIENT_SHELL = "seatunnel.sh";
+    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+    private GenericContainer<?> server;
+
+    @Override
+    public void startUp() throws Exception {
+        server = new GenericContainer<>(getDockerImage())
+            .withNetwork(NETWORK)
+            .withCommand(Paths.get(SEATUNNEL_BIN, SERVER_SHELL).toString())
+            .withNetworkAliases("server")
+            .withExposedPorts()
+            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .waitingFor(Wait.forLogMessage(".*received new worker register.*\\n", 1));
+        server.start();
+        copySeaTunnelStarter(server);
+        // execute extra commands
+        executeExtraCommands(server);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (server != null) {
+            server.close();
+        }
+    }
+
+    @Override
+    protected String getDockerImage() {
+        return JDK_DOCKER_IMAGE;
+    }
+
+    @Override
+    protected String getStartModuleName() {
+        return "seatunnel-starter";
+    }
+
+    @Override
+    protected String getStartShellName() {
+        return CLIENT_SHELL;
+    }
+
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
+    }
+
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
+    }
+
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
+    }
+
+    @Override
+    protected List<String> getExtraStartShellCommands() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String identifier() {
+        return "SeaTunnel";
+    }
+
+    @Override
+    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws IOException, InterruptedException {
+        extendedFactory.extend(server);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException {
+        return executeJob(server, confFile);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
index 5adc84ea6..832be6fc1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
@@ -20,13 +20,11 @@ package org.apache.seatunnel.engine.e2e.console;
 import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 
 import java.io.IOException;
 
-@Disabled("Disabled because connector-v2 jar dist not exist")
 public class FakeSourceToConsoleIT extends SeaTunnelContainer {
 
     @Test
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
index 35f8633ef..3b124fe8b 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
@@ -26,7 +26,13 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
+      schema {
+        fields {
+          id = "int"
+          name = "string"
+          age = "int"
+        }
+      }
     }
 }
 
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 03f7c66bf..ca51d8d6a 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -32,6 +32,20 @@
         <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>avro</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
     <build>
         <plugins>
             <plugin>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 03d2cca0c..ada5c4fac 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -36,14 +36,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-@Disabled("Disabled because connector-v2 jar dist not exist")
 public class JobExecutionIT {
     @BeforeAll
     public static void beforeClass() throws Exception {
@@ -67,7 +66,7 @@ public class JobExecutionIT {
     }
 
     @Test
-    public void testExecuteJob() {
+    public void testExecuteJob() throws IOException {
         TestUtils.initPluginDir();
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
@@ -96,7 +95,7 @@ public class JobExecutionIT {
     }
 
     @Test
-    public void cancelJobTest() {
+    public void cancelJobTest() throws IOException {
         TestUtils.initPluginDir();
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
index 4fd45e6cb..d0d9e65e4 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.e2e;
 
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
@@ -79,10 +81,6 @@ public abstract class SeaTunnelContainer {
         mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/src/main/bin/",
             Paths.get(SEATUNNEL_BIN).toString());
 
-        // copy connectors
-        mountMapping.put(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors-v2-dist/target/lib", Paths.get(SEATUNNEL_CONNECTORS, "seatunnel").toString());
-
         // copy plugin-mapping.properties
         mountMapping.put(PROJECT_ROOT_PATH + "/plugin-mapping.properties", Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
 
@@ -91,6 +89,7 @@ public abstract class SeaTunnelContainer {
 
     @SuppressWarnings("checkstyle:MagicNumber")
     public Container.ExecResult executeSeaTunnelJob(String confFile) throws IOException, InterruptedException {
+        ContainerUtil.copyConnectorJarToContainer(SERVER, confFile, "seatunnel-connectors-v2", "connector-", "seatunnel", SEATUNNEL_HOME);
         final String confPath = getResource(confFile);
         if (!new File(confPath).exists()) {
             throw new IllegalArgumentException(confFile + " doesn't exist");
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 1e1b6a407..e115b5af8 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.engine.e2e;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
-import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class TestUtils {
     public static String getResource(String confFile) {
@@ -34,7 +36,8 @@ public class TestUtils {
         return System.getProperty("user.name") + "_" + testClassName;
     }
 
-    public static void initPluginDir() {
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static void initPluginDir() throws IOException {
         // TODO change connector get method
         // copy connectors to project_root/connectors dir
         System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
@@ -54,23 +57,19 @@ public class TestUtils {
         File connectorDistDir = new File(
             seatunnelRootDir +
                 File.separator +
-                "seatunnel-connectors-v2-dist" +
-                File.separator +
-                "target" +
-                File.separator +
-                "lib");
-
-        Arrays.stream(connectorDistDir.listFiles()).forEach(file -> {
-            if (file.getName().startsWith("connector-")) {
-                Path copied = Paths.get(connectorDir + File.separator + file.getName());
-                Path originalPath = file.toPath();
-                try {
-                    Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+                "seatunnel-connectors-v2");
+        try (Stream<Path> paths = Files.walk(connectorDistDir.toPath(), 6, FileVisitOption.FOLLOW_LINKS)) {
+            paths.filter(path -> path.getFileName().toFile().getName().startsWith("connector-") && path.getFileName().toFile().getName().endsWith(".jar") && !path.getFileName().toString().contains("javadoc"))
+                .collect(Collectors.toSet()).forEach(file -> {
+                    Path copied = Paths.get(connectorDir + File.separator + file.toFile().getName());
+                    Path originalPath = file.toAbsolutePath();
+                    try {
+                        Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+        }
 
         Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
             File.separator +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 3cfb808b7..8c78ced4c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -30,8 +30,13 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
-      parallelism = 3
+      schema {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
     }
 
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 2776e3fd1..ddc450819 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -30,14 +30,24 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
-      parallelism = 3
+      schema {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
     }
 
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
-      parallelism = 3
+      schema {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
     }
 
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 1fec906eb..d454ac791 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -30,13 +30,23 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
+      schema {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
       parallelism = 1
     }
 
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
+      schema {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
       parallelism = 1
     }
 
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index cfda3472d..188cd8a19 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -47,6 +47,13 @@
             <scope>test</scope>
             <classifier>tests</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-engine-client</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 684ed7904..6b764aea0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -124,18 +124,18 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
                     prepareClose = true;
                 }
                 if (barrier.snapshot()) {
-                    if (!writerStateSerializer.isPresent()) {
-                        runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
-                    } else {
-                        List<StateT> states = writer.snapshotState(barrier.getId());
-                        runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states));
-                    }
                     try {
                         lastCommitInfo = writer.prepareCommit();
                     } catch (Exception e) {
                         writer.abortPrepare();
                         throw e;
                     }
+                    if (!writerStateSerializer.isPresent()) {
+                        runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
+                    } else {
+                        List<StateT> states = writer.snapshotState(barrier.getId());
+                        runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states));
+                    }
                     if (containAggCommitter) {
                         lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
                             SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index d6458467a..1d1e0105b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -22,28 +22,30 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngine;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
 
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class AbstractSeaTunnelServerTest {
 
-    protected static SeaTunnelServer SERVER;
+    protected SeaTunnelServer server;
 
-    protected static NodeEngine NODE_ENGINE;
+    protected NodeEngine nodeEngine;
 
-    protected static HazelcastInstanceImpl INSTANCE;
+    protected HazelcastInstanceImpl instance;
 
     protected static ILogger LOGGER;
 
     @BeforeAll
-    public static void before() {
-        INSTANCE = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
-        NODE_ENGINE = INSTANCE.node.nodeEngine;
-        SERVER = NODE_ENGINE.getService(SeaTunnelServer.SERVICE_NAME);
-        LOGGER = NODE_ENGINE.getLogger(AbstractSeaTunnelServerTest.class);
+    public void before() {
+        instance = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
+        nodeEngine = instance.node.nodeEngine;
+        server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+        LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
     }
 
     @AfterAll
-    public static void after() {
-        SERVER.shutdown(true);
-        INSTANCE.shutdown();
+    public void after() {
+        server.shutdown(true);
+        instance.shutdown();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 20a5ffc25..80d1cb42e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -55,9 +55,9 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
     int pipeLineId = 100001;
 
     @BeforeAll
-    public static void before() {
-        AbstractSeaTunnelServerTest.before();
-        FLAKE_ID_GENERATOR = INSTANCE.getFlakeIdGenerator("test");
+    public void before() {
+        super.before();
+        FLAKE_ID_GENERATOR = instance.getFlakeIdGenerator("test");
     }
 
     @Test
@@ -82,7 +82,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
     }
 
     public void testCancel() {
-        TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         long sleepTime = 300;
 
@@ -100,7 +100,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
     }
 
     public void testFinish() {
-        TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         long sleepTime = 300;
 
@@ -134,7 +134,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
         //Create tasks with critical delays
         List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
 
-        TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, FLAKE_ID_GENERATOR.newId()), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
 
@@ -154,7 +154,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
     }
 
     public void testThrowException() throws InterruptedException {
-        TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         AtomicBoolean stopMark = new AtomicBoolean(false);
 
@@ -228,7 +228,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
 
         LOGGER.info("task size is : " + taskGroup.getTasks().size());
 
-        TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(taskGroup, new CompletableFuture<>());
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index e792783a2..d7c23b6e7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -55,17 +55,17 @@ public class CheckpointPlanTest extends AbstractSeaTunnelServerTest {
         config.setName("test");
 
         JobImmutableInformation jobInfo = new JobImmutableInformation(1,
-            NODE_ENGINE.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+            nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
-        IMap<Object, Object> runningJobState = NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobState");
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
         IMap<Object, Long[]> runningJobStateTimestamp =
-            NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+            nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
 
-        Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, NODE_ENGINE,
+        Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
             jobInfo,
             System.currentTimeMillis(),
             Executors.newCachedThreadPool(),
-            INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
             runningJobStateTimestamp).f1();
         Assertions.assertNotNull(checkpointPlans);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index c0a655456..d12c2dac3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -59,11 +59,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
         config.setName("test");
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
-            NODE_ENGINE.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
+            nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-            SERVER.getCoordinatorService().submitJob(jobImmutableInformation.getJobId(),
-                NODE_ENGINE.getSerializationService().toData(jobImmutableInformation));
+            server.getCoordinatorService().submitJob(jobImmutableInformation.getJobId(),
+                nodeEngine.getSerializationService().toData(jobImmutableInformation));
 
         Assertions.assertNotNull(voidPassiveCompletableFuture);
     }
@@ -96,17 +96,17 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
         config.setName("test");
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
-            NODE_ENGINE.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+            nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
-        IMap<Object, Object> runningJobState = NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobState");
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
         IMap<Object, Long[]> runningJobStateTimestamp =
-            NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+            nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
 
-        PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, NODE_ENGINE,
+        PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
             jobImmutableInformation,
             System.currentTimeMillis(),
             Executors.newCachedThreadPool(),
-            INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
             runningJobStateTimestamp).f0();
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 39445fab1..90e8081dd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -36,6 +36,7 @@ import com.hazelcast.map.IMap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
  * JobMaster Tester.
  */
 @DisabledOnOs(OS.WINDOWS)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class JobMasterTest extends AbstractSeaTunnelServerTest {
     private static Long JOB_ID;
 
@@ -91,9 +93,9 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
     private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
 
     @BeforeAll
-    public static void before() {
-        AbstractSeaTunnelServerTest.before();
-        JOB_ID = INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+    public void before() {
+        super.before();
+        JOB_ID = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
     }
 
     @Test
@@ -102,16 +104,16 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
             TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "test_clear_coordinator_service", JOB_ID);
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(JOB_ID,
-            NODE_ENGINE.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+            nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
             Collections.emptyList());
 
-        Data data = NODE_ENGINE.getSerializationService().toData(jobImmutableInformation);
+        Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-            SERVER.getCoordinatorService().submitJob(JOB_ID, data);
+            server.getCoordinatorService().submitJob(JOB_ID, data);
         voidPassiveCompletableFuture.join();
 
-        JobMaster jobMaster = SERVER.getCoordinatorService().getJobMaster(JOB_ID);
+        JobMaster jobMaster = server.getCoordinatorService().getJobMaster(JOB_ID);
 
         // waiting for job status turn to running
         await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -140,10 +142,10 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
     }
 
     private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
-        runningJobInfoIMap = NODE_ENGINE.getHazelcastInstance().getMap("runningJobInfo");
-        runningJobStateIMap = NODE_ENGINE.getHazelcastInstance().getMap("runningJobState");
-        runningJobStateTimestampsIMap = NODE_ENGINE.getHazelcastInstance().getMap("stateTimestamps");
-        ownedSlotProfilesIMap = NODE_ENGINE.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+        runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+        runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+        runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+        ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
 
         await().atMost(60000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 03741c62f..9ea64c127 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -33,15 +33,15 @@ import java.util.concurrent.ExecutionException;
 
 public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
 
-    private static ResourceManager RESOURCE_MANAGER;
+    private ResourceManager resourceManager;
 
     private final long jobId = 5;
 
     @BeforeAll
-    public static void before() {
-        AbstractSeaTunnelServerTest.before();
-        RESOURCE_MANAGER = SERVER.getCoordinatorService().getResourceManager();
-        SERVER.getSlotService();
+    public void before() {
+        super.before();
+        resourceManager = server.getCoordinatorService().getResourceManager();
+        server.getSlotService();
     }
 
     @Test
@@ -50,19 +50,19 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
         resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
         resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
         resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
-        List<SlotProfile> slotProfiles = RESOURCE_MANAGER.applyResources(jobId, resourceProfiles).get();
+        List<SlotProfile> slotProfiles = resourceManager.applyResources(jobId, resourceProfiles).get();
 
         Assertions.assertEquals(resourceProfiles.get(0).getHeapMemory().getBytes(), slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
         Assertions.assertEquals(resourceProfiles.get(1).getHeapMemory().getBytes(), slotProfiles.get(1).getResourceProfile().getHeapMemory().getBytes());
         Assertions.assertEquals(resourceProfiles.get(2).getHeapMemory().getBytes(), slotProfiles.get(2).getResourceProfile().getHeapMemory().getBytes());
 
-        Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.releaseResources(jobId + 1, slotProfiles).get());
+        Assertions.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId + 1, slotProfiles).get());
 
-        RESOURCE_MANAGER.releaseResources(jobId, slotProfiles).get();
+        resourceManager.releaseResources(jobId, slotProfiles).get();
 
-        Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.releaseResources(jobId, slotProfiles).get());
+        Assertions.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId, slotProfiles).get());
 
-        Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
+        Assertions.assertThrows(ExecutionException.class, () -> resourceManager.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
     }
 
 }