You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/10/08 15:24:37 UTC
[incubator-seatunnel] branch dev updated: [Engine] [Test] Fix engine e2e problem (#3019)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5f098a06e [Engine] [Test] Fix engine e2e problem (#3019)
5f098a06e is described below
commit 5f098a06ec85ad4100598c4b5793ce8d0591f015
Author: Hisoka <fa...@qq.com>
AuthorDate: Sat Oct 8 23:24:31 2022 +0800
[Engine] [Test] Fix engine e2e problem (#3019)
* [Engine] [Test] Fix engine e2e problem
* [Engine] [Test] Fix engine e2e problem
---
.../container/seatunnel/SeaTunnelContainer.java | 123 +++++++++++++++++++++
.../engine/e2e/console/FakeSourceToConsoleIT.java | 2 -
.../src/test/resources/fakesource_to_console.conf | 8 +-
.../connector-seatunnel-e2e-base/pom.xml | 14 +++
.../seatunnel/engine/e2e/JobExecutionIT.java | 7 +-
.../seatunnel/engine/e2e/SeaTunnelContainer.java | 7 +-
.../org/apache/seatunnel/engine/e2e/TestUtils.java | 37 +++----
.../test/resources/batch_fakesource_to_file.conf | 9 +-
.../batch_fakesource_to_file_complex.conf | 18 ++-
.../streaming_fakesource_to_file_complex.conf | 14 ++-
seatunnel-e2e/seatunnel-engine-e2e/pom.xml | 7 ++
.../engine/server/task/flow/SinkFlowLifeCycle.java | 12 +-
.../engine/server/AbstractSeaTunnelServerTest.java | 24 ++--
.../engine/server/TaskExecutionServiceTest.java | 16 +--
.../server/checkpoint/CheckpointPlanTest.java | 10 +-
.../seatunnel/engine/server/dag/TaskTest.java | 16 +--
.../engine/server/master/JobMasterTest.java | 24 ++--
.../resourcemanager/ResourceManagerTest.java | 20 ++--
18 files changed, 271 insertions(+), 97 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
new file mode 100644
index 000000000..1ce6fb22d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.seatunnel;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+@NoArgsConstructor
+//@AutoService(TestContainer.class)
+// TODO add AutoService after engine feature is ready
+public class SeaTunnelContainer extends AbstractTestContainer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelContainer.class);
+ private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+ protected static final Network NETWORK = Network.newNetwork();
+
+ private static final String SEATUNNEL_HOME = "/tmp/seatunnel";
+ private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+ private static final String CLIENT_SHELL = "seatunnel.sh";
+ private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+ private GenericContainer<?> server;
+
+ @Override
+ public void startUp() throws Exception {
+ server = new GenericContainer<>(getDockerImage())
+ .withNetwork(NETWORK)
+ .withCommand(Paths.get(SEATUNNEL_BIN, SERVER_SHELL).toString())
+ .withNetworkAliases("server")
+ .withExposedPorts()
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(Wait.forLogMessage(".*received new worker register.*\\n", 1));
+ server.start();
+ copySeaTunnelStarter(server);
+ // execute extra commands
+ executeExtraCommands(server);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ @Override
+ protected String getDockerImage() {
+ return JDK_DOCKER_IMAGE;
+ }
+
+ @Override
+ protected String getStartModuleName() {
+ return "seatunnel-starter";
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return CLIENT_SHELL;
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors-v2";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+
+ @Override
+ protected List<String> getExtraStartShellCommands() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String identifier() {
+ return "SeaTunnel";
+ }
+
+ @Override
+ public void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws IOException, InterruptedException {
+ extendedFactory.extend(server);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException {
+ return executeJob(server, confFile);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
index 5adc84ea6..832be6fc1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
@@ -20,13 +20,11 @@ package org.apache.seatunnel.engine.e2e.console;
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import java.io.IOException;
-@Disabled("Disabled because connector-v2 jar dist not exist")
public class FakeSourceToConsoleIT extends SeaTunnelContainer {
@Test
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
index 35f8633ef..3b124fe8b 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
@@ -26,7 +26,13 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 03f7c66bf..ca51d8d6a 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -32,6 +32,20 @@
<maven-jar-plugin.version>2.4</maven-jar-plugin.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 03d2cca0c..ada5c4fac 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -36,14 +36,13 @@ import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
-@Disabled("Disabled because connector-v2 jar dist not exist")
public class JobExecutionIT {
@BeforeAll
public static void beforeClass() throws Exception {
@@ -67,7 +66,7 @@ public class JobExecutionIT {
}
@Test
- public void testExecuteJob() {
+ public void testExecuteJob() throws IOException {
TestUtils.initPluginDir();
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
@@ -96,7 +95,7 @@ public class JobExecutionIT {
}
@Test
- public void cancelJobTest() {
+ public void cancelJobTest() throws IOException {
TestUtils.initPluginDir();
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
index 4fd45e6cb..d0d9e65e4 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.e2e;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
@@ -79,10 +81,6 @@ public abstract class SeaTunnelContainer {
mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/src/main/bin/",
Paths.get(SEATUNNEL_BIN).toString());
- // copy connectors
- mountMapping.put(PROJECT_ROOT_PATH +
- "/seatunnel-connectors-v2-dist/target/lib", Paths.get(SEATUNNEL_CONNECTORS, "seatunnel").toString());
-
// copy plugin-mapping.properties
mountMapping.put(PROJECT_ROOT_PATH + "/plugin-mapping.properties", Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
@@ -91,6 +89,7 @@ public abstract class SeaTunnelContainer {
@SuppressWarnings("checkstyle:MagicNumber")
public Container.ExecResult executeSeaTunnelJob(String confFile) throws IOException, InterruptedException {
+ ContainerUtil.copyConnectorJarToContainer(SERVER, confFile, "seatunnel-connectors-v2", "connector-", "seatunnel", SEATUNNEL_HOME);
final String confPath = getResource(confFile);
if (!new File(confPath).exists()) {
throw new IllegalArgumentException(confFile + " doesn't exist");
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 1e1b6a407..e115b5af8 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.engine.e2e;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
-import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class TestUtils {
public static String getResource(String confFile) {
@@ -34,7 +36,8 @@ public class TestUtils {
return System.getProperty("user.name") + "_" + testClassName;
}
- public static void initPluginDir() {
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static void initPluginDir() throws IOException {
// TODO change connector get method
// copy connectors to project_root/connectors dir
System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
@@ -54,23 +57,19 @@ public class TestUtils {
File connectorDistDir = new File(
seatunnelRootDir +
File.separator +
- "seatunnel-connectors-v2-dist" +
- File.separator +
- "target" +
- File.separator +
- "lib");
-
- Arrays.stream(connectorDistDir.listFiles()).forEach(file -> {
- if (file.getName().startsWith("connector-")) {
- Path copied = Paths.get(connectorDir + File.separator + file.getName());
- Path originalPath = file.toPath();
- try {
- Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ "seatunnel-connectors-v2");
+ try (Stream<Path> paths = Files.walk(connectorDistDir.toPath(), 6, FileVisitOption.FOLLOW_LINKS)) {
+ paths.filter(path -> path.getFileName().toFile().getName().startsWith("connector-") && path.getFileName().toFile().getName().endsWith(".jar") && !path.getFileName().toString().contains("javadoc"))
+ .collect(Collectors.toSet()).forEach(file -> {
+ Path copied = Paths.get(connectorDir + File.separator + file.toFile().getName());
+ Path originalPath = file.toAbsolutePath();
+ try {
+ Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
File.separator +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 3cfb808b7..8c78ced4c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -30,8 +30,13 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
- parallelism = 3
+ schema {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 2776e3fd1..ddc450819 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -30,14 +30,24 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
- parallelism = 3
+ schema {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
}
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
- parallelism = 3
+ schema {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 1fec906eb..d454ac791 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -30,13 +30,23 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
+ schema {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
parallelism = 1
}
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
+ schema {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
parallelism = 1
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index cfda3472d..188cd8a19 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -47,6 +47,13 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-client</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 684ed7904..6b764aea0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -124,18 +124,18 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
prepareClose = true;
}
if (barrier.snapshot()) {
- if (!writerStateSerializer.isPresent()) {
- runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
- } else {
- List<StateT> states = writer.snapshotState(barrier.getId());
- runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states));
- }
try {
lastCommitInfo = writer.prepareCommit();
} catch (Exception e) {
writer.abortPrepare();
throw e;
}
+ if (!writerStateSerializer.isPresent()) {
+ runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
+ } else {
+ List<StateT> states = writer.snapshotState(barrier.getId());
+ runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states));
+ }
if (containAggCommitter) {
lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index d6458467a..1d1e0105b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -22,28 +22,30 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractSeaTunnelServerTest {
- protected static SeaTunnelServer SERVER;
+ protected SeaTunnelServer server;
- protected static NodeEngine NODE_ENGINE;
+ protected NodeEngine nodeEngine;
- protected static HazelcastInstanceImpl INSTANCE;
+ protected HazelcastInstanceImpl instance;
protected static ILogger LOGGER;
@BeforeAll
- public static void before() {
- INSTANCE = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
- NODE_ENGINE = INSTANCE.node.nodeEngine;
- SERVER = NODE_ENGINE.getService(SeaTunnelServer.SERVICE_NAME);
- LOGGER = NODE_ENGINE.getLogger(AbstractSeaTunnelServerTest.class);
+ public void before() {
+ instance = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
+ nodeEngine = instance.node.nodeEngine;
+ server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
}
@AfterAll
- public static void after() {
- SERVER.shutdown(true);
- INSTANCE.shutdown();
+ public void after() {
+ server.shutdown(true);
+ instance.shutdown();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 20a5ffc25..80d1cb42e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -55,9 +55,9 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
int pipeLineId = 100001;
@BeforeAll
- public static void before() {
- AbstractSeaTunnelServerTest.before();
- FLAKE_ID_GENERATOR = INSTANCE.getFlakeIdGenerator("test");
+ public void before() {
+ super.before();
+ FLAKE_ID_GENERATOR = instance.getFlakeIdGenerator("test");
}
@Test
@@ -82,7 +82,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
}
public void testCancel() {
- TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
long sleepTime = 300;
@@ -100,7 +100,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
}
public void testFinish() {
- TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
long sleepTime = 300;
@@ -134,7 +134,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
//Create tasks with critical delays
List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
- TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(new TaskGroupLocation(jobId, pipeLineId, FLAKE_ID_GENERATOR.newId()), "t1", Lists.newArrayList(criticalTask)), new CompletableFuture<>());
@@ -154,7 +154,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
}
public void testThrowException() throws InterruptedException {
- TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
AtomicBoolean stopMark = new AtomicBoolean(false);
@@ -228,7 +228,7 @@ public class TaskExecutionServiceTest extends AbstractSeaTunnelServerTest {
LOGGER.info("task size is : " + taskGroup.getTasks().size());
- TaskExecutionService taskExecutionService = SERVER.getTaskExecutionService();
+ TaskExecutionService taskExecutionService = server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(taskGroup, new CompletableFuture<>());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index e792783a2..d7c23b6e7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -55,17 +55,17 @@ public class CheckpointPlanTest extends AbstractSeaTunnelServerTest {
config.setName("test");
JobImmutableInformation jobInfo = new JobImmutableInformation(1,
- NODE_ENGINE.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
- IMap<Object, Object> runningJobState = NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobState");
+ IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
IMap<Object, Long[]> runningJobStateTimestamp =
- NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+ nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
- Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, NODE_ENGINE,
+ Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobInfo,
System.currentTimeMillis(),
Executors.newCachedThreadPool(),
- INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+ instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
runningJobStateTimestamp).f1();
Assertions.assertNotNull(checkpointPlans);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index c0a655456..d12c2dac3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -59,11 +59,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
config.setName("test");
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
- NODE_ENGINE.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- SERVER.getCoordinatorService().submitJob(jobImmutableInformation.getJobId(),
- NODE_ENGINE.getSerializationService().toData(jobImmutableInformation));
+ server.getCoordinatorService().submitJob(jobImmutableInformation.getJobId(),
+ nodeEngine.getSerializationService().toData(jobImmutableInformation));
Assertions.assertNotNull(voidPassiveCompletableFuture);
}
@@ -96,17 +96,17 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
config.setName("test");
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
- NODE_ENGINE.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
- IMap<Object, Object> runningJobState = NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobState");
+ IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
IMap<Object, Long[]> runningJobStateTimestamp =
- NODE_ENGINE.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+ nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
- PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, NODE_ENGINE,
+ PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobImmutableInformation,
System.currentTimeMillis(),
Executors.newCachedThreadPool(),
- INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+ instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
runningJobStateTimestamp).f0();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 39445fab1..90e8081dd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -36,6 +36,7 @@ import com.hazelcast.map.IMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
* JobMaster Tester.
*/
@DisabledOnOs(OS.WINDOWS)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class JobMasterTest extends AbstractSeaTunnelServerTest {
private static Long JOB_ID;
@@ -91,9 +93,9 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
@BeforeAll
- public static void before() {
- AbstractSeaTunnelServerTest.before();
- JOB_ID = INSTANCE.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ public void before() {
+ super.before();
+ JOB_ID = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
}
@Test
@@ -102,16 +104,16 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "test_clear_coordinator_service", JOB_ID);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(JOB_ID,
- NODE_ENGINE.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+ nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
Collections.emptyList());
- Data data = NODE_ENGINE.getSerializationService().toData(jobImmutableInformation);
+ Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- SERVER.getCoordinatorService().submitJob(JOB_ID, data);
+ server.getCoordinatorService().submitJob(JOB_ID, data);
voidPassiveCompletableFuture.join();
- JobMaster jobMaster = SERVER.getCoordinatorService().getJobMaster(JOB_ID);
+ JobMaster jobMaster = server.getCoordinatorService().getJobMaster(JOB_ID);
// waiting for job status turn to running
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -140,10 +142,10 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
}
private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
- runningJobInfoIMap = NODE_ENGINE.getHazelcastInstance().getMap("runningJobInfo");
- runningJobStateIMap = NODE_ENGINE.getHazelcastInstance().getMap("runningJobState");
- runningJobStateTimestampsIMap = NODE_ENGINE.getHazelcastInstance().getMap("stateTimestamps");
- ownedSlotProfilesIMap = NODE_ENGINE.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+ runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+ runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+ ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 03741c62f..9ea64c127 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -33,15 +33,15 @@ import java.util.concurrent.ExecutionException;
public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
- private static ResourceManager RESOURCE_MANAGER;
+ private ResourceManager resourceManager;
private final long jobId = 5;
@BeforeAll
- public static void before() {
- AbstractSeaTunnelServerTest.before();
- RESOURCE_MANAGER = SERVER.getCoordinatorService().getResourceManager();
- SERVER.getSlotService();
+ public void before() {
+ super.before();
+ resourceManager = server.getCoordinatorService().getResourceManager();
+ server.getSlotService();
}
@Test
@@ -50,19 +50,19 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest {
resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
- List<SlotProfile> slotProfiles = RESOURCE_MANAGER.applyResources(jobId, resourceProfiles).get();
+ List<SlotProfile> slotProfiles = resourceManager.applyResources(jobId, resourceProfiles).get();
Assertions.assertEquals(resourceProfiles.get(0).getHeapMemory().getBytes(), slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
Assertions.assertEquals(resourceProfiles.get(1).getHeapMemory().getBytes(), slotProfiles.get(1).getResourceProfile().getHeapMemory().getBytes());
Assertions.assertEquals(resourceProfiles.get(2).getHeapMemory().getBytes(), slotProfiles.get(2).getResourceProfile().getHeapMemory().getBytes());
- Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.releaseResources(jobId + 1, slotProfiles).get());
+ Assertions.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId + 1, slotProfiles).get());
- RESOURCE_MANAGER.releaseResources(jobId, slotProfiles).get();
+ resourceManager.releaseResources(jobId, slotProfiles).get();
- Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.releaseResources(jobId, slotProfiles).get());
+ Assertions.assertThrows(ExecutionException.class, () -> resourceManager.releaseResources(jobId, slotProfiles).get());
- Assertions.assertThrows(ExecutionException.class, () -> RESOURCE_MANAGER.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
+ Assertions.assertThrows(ExecutionException.class, () -> resourceManager.applyResource(jobId, new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))).get());
}
}