You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/09 08:00:54 UTC
[incubator-seatunnel] branch dev updated: [Improve][e2e] Container only copy required connector jars (#2675)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5039752ea [Improve][e2e] Container only copy required connector jars (#2675)
5039752ea is described below
commit 5039752eacc8702477b77e5b9b0946ad87125f9b
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri Sep 9 16:00:48 2022 +0800
[Improve][e2e] Container only copy required connector jars (#2675)
* [Improve][e2e] flink container only copy required connector jars
* rename flink-e2e-common
* remove useless imported
* [Improve][e2e] flink sql container refactoring
* remove useless imported
* remove useless
* [Improve][e2e] spark container only copy required connector jars
* change for code review
* Use e2e-common module directly
* checkstyle
* code format
---
.../api/configuration/ReadonlyConfig.java | 12 +-
seatunnel-e2e/pom.xml | 8 +-
.../pom.xml | 14 +-
.../seatunnel/e2e/common/AbstractContainer.java | 103 +++++++++++++
.../e2e/common/AbstractFlinkContainer.java | 121 +++++++++++++++
.../e2e/common/AbstractSparkContainer.java | 87 +++++++++++
.../apache/seatunnel/e2e/common/ContainerUtil.java | 165 +++++++++++++++++++++
.../connector-flink-e2e-base/pom.xml | 11 +-
.../apache/seatunnel/e2e/flink/FlinkContainer.java | 150 +++----------------
.../seatunnel-flink-connector-v2-e2e/pom.xml | 2 +-
.../seatunnel-connector-flink-e2e-base/pom.xml | 12 +-
.../apache/seatunnel/e2e/flink/FlinkContainer.java | 149 +++----------------
.../setunnel-connector-flink-sql-e2e-base/pom.xml | 11 +-
.../seatunnel/e2e/flink/sql/FlinkContainer.java | 129 +++-------------
.../e2e/flink/sql/fake/DatagenToConsoleIT.java | 5 +-
.../connector-spark-e2e-base/pom.xml | 11 +-
.../apache/seatunnel/e2e/spark/SparkContainer.java | 143 +++---------------
.../seatunnel-spark-connector-v2-e2e/pom.xml | 2 +-
.../seatunnel-connector-spark-e2e-base/pom.xml | 11 +-
.../apache/seatunnel/e2e/spark/SparkContainer.java | 139 +++--------------
20 files changed, 633 insertions(+), 652 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index beb9d08a9..32e8db273 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -66,18 +66,24 @@ public class ReadonlyConfig {
return getOptional(option).orElseGet(option::defaultValue);
}
- @SuppressWarnings("MagicNumber")
public Map<String, String> toMap() {
if (confData.isEmpty()) {
return Collections.emptyMap();
}
+ Map<String, String> result = new HashMap<>();
+ toMap(result);
+ return result;
+ }
+
+ public void toMap(Map<String, String> result) {
+ if (confData.isEmpty()) {
+ return;
+ }
Map<String, Object> flatteningMap = flatteningMap(confData);
- Map<String, String> result = new HashMap<>((flatteningMap.size() << 2) / 3 + 1);
for (Map.Entry<String, Object> entry : flatteningMap.entrySet()) {
result.put(entry.getKey(), convertToJsonString(entry.getValue()));
}
- return result;
}
@SuppressWarnings("unchecked")
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index c9b0b975a..c8bb6374e 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -27,15 +27,17 @@
<packaging>pom</packaging>
<modules>
- <module>seatunnel-flink-e2e</module>
- <module>seatunnel-spark-e2e</module>
+ <module>seatunnel-e2e-common</module>
<module>seatunnel-flink-connector-v2-e2e</module>
- <module>seatunnel-spark-connector-v2-e2e</module>
+ <module>seatunnel-flink-e2e</module>
<module>seatunnel-flink-sql-e2e</module>
+ <module>seatunnel-spark-connector-v2-e2e</module>
+ <module>seatunnel-spark-e2e</module>
</modules>
<properties>
<junit4.version>4.13.2</junit4.version>
+ <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-e2e-common/pom.xml
similarity index 84%
copy from seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
copy to seatunnel-e2e/seatunnel-e2e-common/pom.xml
index 2cd8cacdb..3ddc46cb5 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-e2e-common/pom.xml
@@ -17,17 +17,21 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-flink-sql-e2e</artifactId>
+ <artifactId>seatunnel-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>setunnel-connector-flink-sql-e2e-base</artifactId>
+ <artifactId>seatunnel-e2e-common</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
new file mode 100644
index 000000000..f4fb9d116
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import static org.apache.seatunnel.e2e.common.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.adaptPathForWin;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConfigFileToContainer;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConnectorJarToContainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractContainer {
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractContainer.class);
+ protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
+
+ protected final String startModuleName;
+
+ protected final String startModuleFullPath;
+
+ public AbstractContainer() {
+ String[] modules = getStartModulePath().split(File.separator);
+ this.startModuleName = modules[modules.length - 1];
+ this.startModuleFullPath = PROJECT_ROOT_PATH + File.separator +
+ START_ROOT_MODULE_NAME + File.separator + getStartModulePath();
+ }
+
+ protected abstract String getDockerImage();
+
+ protected abstract String getStartModulePath();
+
+ protected abstract String getStartShellName();
+
+ protected abstract String getConnectorModulePath();
+
+ protected abstract String getConnectorType();
+
+ protected abstract String getConnectorNamePrefix();
+
+ protected abstract String getSeaTunnelHomeInContainer();
+
+ protected abstract List<String> getExtraStartShellCommands();
+
+ protected void copySeaTunnelStarter(GenericContainer<?> container) {
+ String[] modules = getStartModulePath().split(File.separator);
+ final String startModuleName = modules[modules.length - 1];
+ ContainerUtil.copySeaTunnelStarter(container,
+ startModuleName,
+ PROJECT_ROOT_PATH + File.separator + START_ROOT_MODULE_NAME + File.separator + getStartModulePath(),
+ getSeaTunnelHomeInContainer(),
+ getStartShellName());
+ }
+
+ protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile) throws IOException, InterruptedException {
+ final String confInContainerPath = copyConfigFileToContainer(container, confFile);
+ // copy connectors
+ copyConnectorJarToContainer(container,
+ confFile,
+ getConnectorModulePath(),
+ getConnectorNamePrefix(),
+ getConnectorType(),
+ getSeaTunnelHomeInContainer());
+ return executeCommand(container, confInContainerPath);
+ }
+
+ protected Container.ExecResult executeCommand(GenericContainer<?> container, String configPath) throws IOException, InterruptedException {
+ final List<String> command = new ArrayList<>();
+ String binPath = Paths.get(getSeaTunnelHomeInContainer(), "bin", getStartShellName()).toString();
+ // base command
+ command.add(adaptPathForWin(binPath));
+ command.add("--config");
+ command.add(adaptPathForWin(configPath));
+ command.addAll(getExtraStartShellCommands());
+
+ Container.ExecResult execResult = container.execInContainer("bash", "-c", String.join(" ", command));
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ return execResult;
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java
new file mode 100644
index 000000000..1f06648eb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link AbstractFlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractFlinkContainer extends AbstractContainer {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkContainer.class);
+
+ protected static final String FLINK_SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+
+ protected static final Network NETWORK = Network.newNetwork();
+
+ protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
+
+ protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+
+ protected GenericContainer<?> jobManager;
+ protected GenericContainer<?> taskManager;
+
+ @Override
+ protected String getDockerImage() {
+ return DEFAULT_DOCKER_IMAGE;
+ }
+
+ @Override
+ protected String getSeaTunnelHomeInContainer() {
+ return FLINK_SEATUNNEL_HOME;
+ }
+
+ @BeforeAll
+ public void before() {
+ final String dockerImage = getDockerImage();
+ final String properties = String.join("\n", getFlinkProperties());
+ jobManager = new GenericContainer<>(dockerImage)
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jobmanager")
+ .withExposedPorts()
+ .withEnv("FLINK_PROPERTIES", properties)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ taskManager =
+ new GenericContainer<>(dockerImage)
+ .withCommand("taskmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("taskmanager")
+ .withEnv("FLINK_PROPERTIES", properties)
+ .dependsOn(jobManager)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ copySeaTunnelStarter(jobManager);
+ LOG.info("Flink containers are started.");
+ }
+
+ protected List<String> getFlinkProperties() {
+ return DEFAULT_FLINK_PROPERTIES;
+ }
+
+ @AfterAll
+ public void close() {
+ if (taskManager != null) {
+ taskManager.stop();
+ }
+ if (jobManager != null) {
+ jobManager.stop();
+ }
+ }
+
+ @Override
+ protected List<String> getExtraStartShellCommands() {
+ return Collections.emptyList();
+ }
+
+ public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
+ return executeJob(jobManager, confFile);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
new file mode 100644
index 000000000..fc16846c1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractSparkContainer extends AbstractContainer {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkContainer.class);
+
+ private static final String SPARK_SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+ private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.3";
+ public static final Network NETWORK = Network.newNetwork();
+
+ protected GenericContainer<?> master;
+
+ @Override
+ protected String getDockerImage() {
+ return DEFAULT_DOCKER_IMAGE;
+ }
+
+ @Override
+ protected String getSeaTunnelHomeInContainer() {
+ return SPARK_SEATUNNEL_HOME;
+ }
+
+ @BeforeAll
+ public void before() {
+ master = new GenericContainer<>(getDockerImage())
+ .withNetwork(NETWORK)
+ .withNetworkAliases("spark-master")
+ .withExposedPorts()
+ .withEnv("SPARK_MODE", "master")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
+ // start a worker.
+ Startables.deepStart(Stream.of(master)).join();
+ copySeaTunnelStarter(master);
+ LOG.info("Spark container started");
+ }
+
+ @AfterAll
+ public void close() {
+ if (master != null) {
+ master.stop();
+ }
+ }
+
+ @Override
+ protected List<String> getExtraStartShellCommands() {
+ return Arrays.asList("--master local",
+ "--deploy-mode client");
+ }
+
+ public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
+ return executeJob(master, confFile);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java
new file mode 100644
index 000000000..237e30f09
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public final class ContainerUtil {
+
+ public static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+
+ /**
+ * An error occurs when the user is not a submodule of seatunnel-e2e.
+ */
+ public static final String PROJECT_ROOT_PATH = System.getProperty("user.dir").split("/seatunnel-e2e/")[0];
+
+ public static void copyConnectorJarToContainer(GenericContainer<?> container,
+ String confFile,
+ String connectorsRootPath,
+ String connectorPrefix,
+ String connectorType,
+ String seatunnelHome) {
+ Config jobConfig = getConfig(getConfigFile(confFile));
+ Config connectorsMapping = getConfig(new File(PROJECT_ROOT_PATH + File.separator + PLUGIN_MAPPING_FILE));
+ if (!connectorsMapping.hasPath(connectorType) || connectorsMapping.getConfig(connectorType).isEmpty()) {
+ return;
+ }
+ Config connectors = connectorsMapping.getConfig(connectorType);
+ Set<String> connectorNames = getConnectors(jobConfig, connectors, "source");
+ connectorNames.addAll(getConnectors(jobConfig, connectors, "sink"));
+ File module = new File(PROJECT_ROOT_PATH + File.separator + connectorsRootPath);
+
+ List<File> connectorFiles = getConnectorFiles(module, connectorNames, connectorPrefix);
+ connectorFiles.forEach(jar ->
+ container.copyFileToContainer(
+ MountableFile.forHostPath(jar.getAbsolutePath()),
+ Paths.get(Paths.get(seatunnelHome, "connectors").toString(), connectorType, jar.getName()).toString()));
+ }
+
+ public static String copyConfigFileToContainer(GenericContainer<?> container, String confFile) {
+ final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+ container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()), targetConfInContainer);
+ return targetConfInContainer;
+ }
+
+ public static void copySeaTunnelStarter(GenericContainer<?> container,
+ String startModuleName,
+ String startModulePath,
+ String seatunnelHomeInContainer,
+ String startShellName) {
+ final String startJarName = startModuleName + ".jar";
+ // copy lib
+ final String startJarPath = startModulePath + File.separator + "target" + File.separator + startJarName;
+ container.copyFileToContainer(
+ MountableFile.forHostPath(startJarPath),
+ Paths.get(Paths.get(seatunnelHomeInContainer, "lib").toString(), startJarName).toString());
+
+ // copy bin
+ final String startBinPath = startModulePath + File.separator + "/src/main/bin/" + startShellName;
+ container.copyFileToContainer(
+ MountableFile.forHostPath(startBinPath),
+ Paths.get(Paths.get(seatunnelHomeInContainer, "bin").toString(), startShellName).toString());
+
+ // copy plugin-mapping.properties
+ container.copyFileToContainer(
+ MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
+ Paths.get(Paths.get(seatunnelHomeInContainer, "connectors").toString(), PLUGIN_MAPPING_FILE).toString());
+ }
+
+ public static String adaptPathForWin(String path) {
+ // Running IT use cases under Windows requires replacing \ with /
+ return path == null ? "" : path.replaceAll("\\\\", "/");
+ }
+
+ private static List<File> getConnectorFiles(File currentModule, Set<String> connectorNames, String connectorPrefix) {
+ List<File> connectorFiles = new ArrayList<>();
+ for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+ getConnectorFiles(file, connectorNames, connectorPrefix, connectorFiles);
+ }
+ return connectorFiles;
+ }
+
+ private static void getConnectorFiles(File currentModule, Set<String> connectorNames, String connectorPrefix, List<File> connectors) {
+ if (currentModule.isFile() || connectorNames.size() == connectors.size()) {
+ return;
+ }
+ if (connectorNames.contains(currentModule.getName())) {
+ File targetPath = new File(currentModule.getAbsolutePath() + File.separator + "target");
+ for (File file : Objects.requireNonNull(targetPath.listFiles())) {
+ if (file.getName().startsWith(currentModule.getName()) && !file.getName().endsWith("javadoc.jar")) {
+ connectors.add(file);
+ return;
+ }
+ }
+ }
+
+ if (currentModule.getName().startsWith(connectorPrefix)) {
+ for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+ getConnectorFiles(file, connectorNames, connectorPrefix, connectors);
+ }
+ }
+ }
+
+ private static Set<String> getConnectors(Config jobConfig, Config connectorsMap, String pluginType) {
+ List<? extends Config> connectorConfigList = jobConfig.getConfigList(pluginType);
+ Map<String, String> connectors = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ ReadonlyConfig.fromConfig(connectorsMap.getConfig(pluginType)).toMap(connectors);
+ return connectorConfigList.stream()
+ .map(config -> config.getString("plugin_name"))
+ .filter(connectors::containsKey)
+ .map(connectors::get)
+ .collect(Collectors.toSet());
+ }
+
+ public static Path getCurrentModulePath() {
+ return Paths.get(System.getProperty("user.dir"));
+ }
+
+ private static File getConfigFile(String confFile) {
+ File file = new File(getCurrentModulePath() + "/src/test/resources" + confFile);
+ if (file.exists()) {
+ return file;
+ }
+ throw new IllegalArgumentException(confFile + " doesn't exist");
+ }
+
+ private static Config getConfig(File file) {
+ return ConfigFactory
+ .parseFile(file)
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
index 57a12fa6c..d2f48c420 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
@@ -25,9 +25,14 @@
<artifactId>connector-flink-e2e-base</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 4fe612a09..6ecaa77cb 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -17,152 +17,42 @@
package org.apache.seatunnel.e2e.flink;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
/**
* This class is the base class of FlinkEnvironment test for new seatunnel connector API.
* The before method will create a Flink cluster, and after method will close the Flink cluster.
* You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
*/
-public abstract class FlinkContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
- private static final String FLINK_DOCKER_IMAGE = "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
- protected static final Network NETWORK = Network.newNetwork();
-
- protected GenericContainer<?> jobManager;
- protected GenericContainer<?> taskManager;
- private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
- private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh";
- private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar";
- private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
- private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
- private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
- private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
- private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
- private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
- private static final String FLINK_PROPERTIES = String.join(
- "\n",
- Arrays.asList(
- "jobmanager.rpc.address: jobmanager",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
-
- @BeforeEach
- public void before() {
- jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("jobmanager")
- .withExposedPorts()
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
- taskManager =
- new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("taskmanager")
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- Startables.deepStart(Stream.of(jobManager)).join();
- Startables.deepStart(Stream.of(taskManager)).join();
- copySeaTunnelFlinkFile();
- LOG.info("Flink containers are started.");
+ @Override
+ protected String getDockerImage() {
+ return "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
}
- @AfterEach
- public void close() {
- if (taskManager != null) {
- taskManager.stop();
- }
- if (jobManager != null) {
- jobManager.stop();
- }
+ @Override
+ protected String getStartModulePath() {
+ return "seatunnel-flink-starter";
}
- public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
- final String confPath = getResource(confFile);
- if (!new File(confPath).exists()) {
- throw new IllegalArgumentException(confFile + " doesn't exist");
- }
- final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
- jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
- // Running IT use cases under Windows requires replacing \ with /
- String conf = targetConfInContainer.replaceAll("\\\\", "/");
- String binPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString().replaceAll("\\\\", "/");
- final List<String> command = new ArrayList<>();
- command.add(binPath);
- command.add("--config " + conf);
-
- Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
- LOG.info(execResult.getStdout());
- LOG.error(execResult.getStderr());
- // wait job start
- Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
- return execResult;
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-flink-new-connector.sh";
}
- protected void copySeaTunnelFlinkFile() {
- // copy lib
- String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
- + "/seatunnel-core/seatunnel-flink-starter/target/" + SEATUNNEL_FLINK_JAR;
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
- Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
-
- // copy bin
- String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-flink-starter/src/main/bin/" + SEATUNNEL_FLINK_BIN;
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelFlinkBinPath),
- Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
-
- // copy connectors
- File jars = new File(PROJECT_ROOT_PATH +
- "/seatunnel-connectors-v2-dist/target/lib");
- Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("connector-"))))
- .forEach(jar ->
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(jar.getAbsolutePath()),
- getConnectorPath(jar.getName())));
-
- // copy plugin-mapping.properties
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
- Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
}
- private String getResource(String confFile) {
- return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors-v2";
}
- private String getConnectorPath(String fileName) {
- return Paths.get(SEATUNNEL_CONNECTORS, "seatunnel", fileName).toString();
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
}
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 4f5f037dd..045579f37 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -41,7 +41,7 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-flink</artifactId>
+ <artifactId>seatunnel-flink-starter</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
index 32f26486d..7968e8526 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
@@ -25,10 +25,14 @@
<artifactId>seatunnel-connector-flink-e2e-base</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
-
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index f3308dead..a1ec70269 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -17,152 +17,37 @@
package org.apache.seatunnel.e2e.flink;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
/**
* This class is the base class of FlinkEnvironment test.
* The before method will create a Flink cluster, and after method will close the Flink cluster.
* You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
*/
-public abstract class FlinkContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
- private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
- protected static final Network NETWORK = Network.newNetwork();
-
- protected GenericContainer<?> jobManager;
- protected GenericContainer<?> taskManager;
- private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
- private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink.sh";
- private static final String SEATUNNEL_FLINK_JAR = "seatunnel-core-flink.jar";
- private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
- private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
- private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
- private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
- private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
- private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
- private static final String FLINK_PROPERTIES = String.join(
- "\n",
- Arrays.asList(
- "jobmanager.rpc.address: jobmanager",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
-
- @BeforeEach
- public void before() {
- jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("jobmanager")
- .withExposedPorts()
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- taskManager =
- new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("taskmanager")
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
- Startables.deepStart(Stream.of(jobManager)).join();
- Startables.deepStart(Stream.of(taskManager)).join();
- copySeaTunnelFlinkFile();
- LOG.info("Flink containers are started.");
+ @Override
+ protected String getStartModulePath() {
+ return "seatunnel-core-flink";
}
- @AfterEach
- public void close() {
- if (taskManager != null) {
- taskManager.stop();
- }
- if (jobManager != null) {
- jobManager.stop();
- }
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-flink.sh";
}
- public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
- final String confPath = getResource(confFile);
- if (!new File(confPath).exists()) {
- throw new IllegalArgumentException(confFile + " doesn't exist");
- }
- final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
- jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
- // Running IT use cases under Windows requires replacing \ with /
- String conf = targetConfInContainer.replaceAll("\\\\", "/");
- final List<String> command = new ArrayList<>();
- command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-flink.sh").toString());
- command.add("--config " + conf);
-
- Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
- LOG.info(execResult.getStdout());
- LOG.error(execResult.getStderr());
- // wait job start
- Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
- return execResult;
- }
-
- protected void copySeaTunnelFlinkFile() {
- // copy lib
- String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
- + "/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
- Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
-
- // copy bin
- String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh";
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelFlinkBinPath),
- Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
-
- // copy connectors
- File jars = new File(PROJECT_ROOT_PATH +
- "/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
- Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-flink"))))
- .forEach(jar ->
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(jar.getAbsolutePath()),
- getConnectorPath(jar.getName())));
-
- // copy plugin-mapping.properties
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
- Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
}
- private String getResource(String confFile) {
- return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors/seatunnel-connectors-flink";
}
- private String getConnectorPath(String fileName) {
- return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString();
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "seatunnel-connector-flink-";
}
-
}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
index 2cd8cacdb..5de92cec7 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
@@ -25,9 +25,14 @@
<artifactId>setunnel-connector-flink-sql-e2e-base</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
index 082500856..82a7159c1 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -17,128 +17,37 @@
package org.apache.seatunnel.e2e.flink.sql;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
/**
* This class is the base class of FlinkEnvironment test.
* The before method will create a Flink cluster, and after method will close the Flink cluster.
- * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob(String)} to submit a seatunnel config and run a seatunnel job.
*/
-public abstract class FlinkContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
- private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
- protected static final Network NETWORK = Network.newNetwork();
-
- protected GenericContainer<?> jobManager;
- protected GenericContainer<?> taskManager;
- private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
- private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
- private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar";
- private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
- private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString();
- private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();
-
- private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
- private static final String FLINK_PROPERTIES = String.join(
- "\n",
- Arrays.asList(
- "jobmanager.rpc.address: jobmanager",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
- @BeforeEach
- public void before() {
- jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("jobmanager")
- .withExposedPorts()
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- taskManager =
- new GenericContainer<>(FLINK_DOCKER_IMAGE)
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases("taskmanager")
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- Startables.deepStart(Stream.of(jobManager)).join();
- Startables.deepStart(Stream.of(taskManager)).join();
- copySeaTunnelFlinkFile();
- LOG.info("Flink containers are started.");
+ @Override
+ protected String getStartModulePath() {
+ return "seatunnel-core-flink-sql";
}
- @AfterEach
- public void close() {
- if (taskManager != null) {
- taskManager.stop();
- }
- if (jobManager != null) {
- jobManager.stop();
- }
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-sql.sh";
}
- public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
- throws IOException, InterruptedException, URISyntaxException {
- final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
- if (!new File(confPath).exists()) {
- throw new IllegalArgumentException(confFile + " doesn't exist");
- }
- final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
- jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
- // Running IT use cases under Windows requires replacing \ with /
- String conf = targetConfInContainer.replaceAll("\\\\", "/");
- final List<String> command = new ArrayList<>();
- command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-sql.sh").toString());
- command.add("--config " + conf);
-
- Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
- LOG.info(execResult.getStdout());
- LOG.error(execResult.getStderr());
- // wait job start
- Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
- return execResult;
+ @Override
+ protected String getConnectorType() {
+ return "flink-sql";
}
- protected void copySeaTunnelFlinkFile() {
- // copy lib
- String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
- FLINK_JAR_PATH);
-
- // copy bin
- String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
- jobManager.copyFileToContainer(
- MountableFile.forHostPath(seatunnelFlinkBinPath),
- Paths.get(SEATUNNEL_BIN).toString());
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors/seatunnel-connectors-flink-sql";
}
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "flink-sql-connector-";
+ }
}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
index 05c5eb09c..b9f57920c 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
@@ -24,14 +24,13 @@ import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import java.io.IOException;
-import java.net.URISyntaxException;
public class DatagenToConsoleIT extends FlinkContainer {
@Test
- public void testDatagenToConsole() throws IOException, URISyntaxException, InterruptedException {
+ public void testDatagenToConsole() throws IOException, InterruptedException {
final String configFile = "/fake/flink.sql.conf";
- Container.ExecResult execResult = executeSeaTunnelFlinkSqlJob(configFile);
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob(configFile);
Assertions.assertEquals(0, execResult.getExitCode());
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
index cbc8495c5..fcb3569ed 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
@@ -25,9 +25,14 @@
<artifactId>connector-spark-e2e-base</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index 2f74483f0..7ad7c69f5 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -17,145 +17,36 @@
package org.apache.seatunnel.e2e.spark;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractSparkContainer;
/**
* This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master.
* You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job.
*/
-public abstract class SparkContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class);
-
- private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3";
- public static final Network NETWORK = Network.newNetwork();
-
- protected GenericContainer<?> master;
- private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
- private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh";
- private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar";
- private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
- private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
- private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
- private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
- private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
- private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
-
- @BeforeEach
- public void before() {
- master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases("spark-master")
- .withExposedPorts()
- .withEnv("SPARK_MODE", "master")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
- // start a worker.
-
- Startables.deepStart(Stream.of(master)).join();
- copySeaTunnelSparkFile();
- LOG.info("Spark container started");
- }
-
- @AfterEach
- public void close() {
- if (master != null) {
- master.stop();
- }
- }
-
- public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
- final String confPath = getResource(confFile);
- if (!new File(confPath).exists()) {
- throw new IllegalArgumentException(confFile + " doesn't exist");
- }
- final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
- master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
- // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
- final List<String> command = new ArrayList<>();
- String sparkBinPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_SPARK_BIN).toString();
- command.add(adaptPathForWin(sparkBinPath));
- command.add("--master");
- command.add("local");
- command.add("--deploy-mode");
- command.add("client");
- command.add("--config " + adaptPathForWin(targetConfInContainer));
-
- Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
- LOG.info(execResult.getStdout());
- LOG.error(execResult.getStderr());
- // wait job start
- Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
- return execResult;
- }
-
- protected void copySeaTunnelSparkFile() {
- // copy lib
- String seatunnelCoreSparkJarPath = Paths.get(PROJECT_ROOT_PATH.toString(),
- "seatunnel-core", "seatunnel-spark-starter", "target", SEATUNNEL_SPARK_JAR).toString();
- master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
-
- // copy bin
- String seatunnelSparkBinPath = Paths.get(PROJECT_ROOT_PATH.toString(),
- "seatunnel-core", "seatunnel-spark-starter", "src", "main", "bin", SEATUNNEL_SPARK_BIN).toString();
- master.copyFileToContainer(
- MountableFile.forHostPath(seatunnelSparkBinPath),
- Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
-
- // copy connectors
- getConnectorJarFiles()
- .forEach(jar ->
- master.copyFileToContainer(
- MountableFile.forHostPath(jar.getAbsolutePath()),
- getConnectorPath(jar.getName())));
+public abstract class SparkContainer extends AbstractSparkContainer {
- // copy plugin-mapping.properties
- master.copyFileToContainer(
- MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
- Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+ @Override
+ protected String getStartModulePath() {
+ return "seatunnel-spark-starter";
}
- private String getResource(String confFile) {
- return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-spark-new-connector.sh";
}
- private String getConnectorPath(String fileName) {
- return Paths.get(CONNECTORS_PATH, "seatunnel", fileName).toString();
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
}
- private List<File> getConnectorJarFiles() {
- File jars = new File(PROJECT_ROOT_PATH +
- "/seatunnel-connectors-v2-dist/target/lib");
- return Arrays.stream(
- Objects.requireNonNull(
- jars.listFiles(
- f -> f.getName().contains("connector-"))))
- .collect(Collectors.toList());
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors-v2";
}
- private String adaptPathForWin(String path) {
- return path == null ? "" : path.replaceAll("\\\\", "/");
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index a7a084bac..b5aef0ec7 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -39,7 +39,7 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-spark</artifactId>
+ <artifactId>seatunnel-spark-starter</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
index 7fa7dfaa0..2ff76f7db 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
@@ -25,9 +25,14 @@
<artifactId>seatunnel-connector-spark-e2e-base</artifactId>
- <properties>
- <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-e2e-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index 6dca627ac..8811c06a4 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -17,141 +17,36 @@
package org.apache.seatunnel.e2e.spark;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractSparkContainer;
/**
* This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master.
* You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job.
*/
-public abstract class SparkContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class);
-
- private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3";
- public static final Network NETWORK = Network.newNetwork();
-
- protected GenericContainer<?> master;
- private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
- private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark.sh";
- private static final String SEATUNNEL_SPARK_JAR = "seatunnel-core-spark.jar";
- private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
- private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
- private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
- private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
- private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
- private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
-
- @BeforeEach
- public void before() {
- master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases("spark-master")
- .withExposedPorts()
- .withEnv("SPARK_MODE", "master")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
- // start a worker.
-
- Startables.deepStart(Stream.of(master)).join();
- copySeaTunnelSparkFile();
- LOG.info("Spark container started");
- }
+public abstract class SparkContainer extends AbstractSparkContainer {
- @AfterEach
- public void close() {
- if (master != null) {
- master.stop();
- }
+ @Override
+ protected String getStartModulePath() {
+ return "seatunnel-core-spark";
}
- public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
- final String confPath = getResource(confFile);
- if (!new File(confPath).exists()) {
- throw new IllegalArgumentException(confFile + " doesn't exist");
- }
- final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
- master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
- // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
- // Running IT use cases under Windows requires replacing \ with /
- String conf = targetConfInContainer.replaceAll("\\\\", "/");
- final List<String> command = new ArrayList<>();
- command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-spark.sh").toString());
- command.add("--master");
- command.add("local");
- command.add("--deploy-mode");
- command.add("client");
- command.add("--config " + conf);
-
- Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
- LOG.info(execResult.getStdout());
- LOG.error(execResult.getStderr());
- // wait job start
- Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
- return execResult;
- }
-
- protected void copySeaTunnelSparkFile() {
- // copy lib
- String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
- + "/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
- master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
-
- // copy bin
- String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh";
- master.copyFileToContainer(
- MountableFile.forHostPath(seatunnelFlinkBinPath),
- Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
-
- // copy connectors
- getConnectorJarFiles()
- .forEach(jar ->
- master.copyFileToContainer(
- MountableFile.forHostPath(jar.getAbsolutePath()),
- getConnectorPath(jar.getName())));
-
- // copy plugin-mapping.properties
- master.copyFileToContainer(
- MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
- Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-spark.sh";
}
- private String getResource(String confFile) {
- return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ @Override
+ protected String getConnectorType() {
+ return "spark";
}
- private String getConnectorPath(String fileName) {
- return Paths.get(CONNECTORS_PATH, "spark", fileName).toString();
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors/seatunnel-connectors-spark";
}
- private List<File> getConnectorJarFiles() {
- File jars = new File(PROJECT_ROOT_PATH +
- "/seatunnel-connectors/seatunnel-connectors-spark-dist/target/lib");
- return Arrays.stream(
- Objects.requireNonNull(
- jars.listFiles(
- f -> f.getName().contains("seatunnel-connector-spark"))))
- .collect(Collectors.toList());
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "seatunnel-connector-spark-";
}
}