You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/09 03:23:51 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2675: [Improve][e2e] Container only copy required connector jars

hailin0 commented on code in PR #2675:
URL: https://github.com/apache/incubator-seatunnel/pull/2675#discussion_r966585475


##########
seatunnel-e2e/seatunnel-flink-e2e-common/src/test/java/org/apache/seatunnel/e2e/flink/AbstractFlinkContainer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink;
+
+import static org.apache.seatunnel.e2e.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.apache.seatunnel.e2e.ContainerUtil.adaptPathForWin;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConfigFileToContainer;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConnectorJarToContainer;
+import static org.apache.seatunnel.e2e.ContainerUtil.copySeaTunnelStarter;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link AbstractFlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractFlinkContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkContainer.class);
+
+    protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
+
+    protected static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+
+    protected final String dockerImage;
+
+    protected final String startShellName;
+
+    protected final String startModuleName;
+
+    protected final String startModulePath;
+
+    protected final String connectorsRootPath;
+
+    protected final String connectorType;
+
+    protected final String connectorNamePrefix;
+    protected static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+
+    public AbstractFlinkContainer(String dockerImage,
+                                  String startShellName,
+                                  String startModuleNameInSeaTunnelCore,
+                                  String connectorsRootPath,
+                                  String connectorType,
+                                  String connectorNamePrefix) {
+        this.dockerImage = dockerImage;
+        this.startShellName = startShellName;
+        this.connectorsRootPath = connectorsRootPath;
+        this.connectorType = connectorType;
+        this.connectorNamePrefix = connectorNamePrefix;
+        String[] moudules = startModuleNameInSeaTunnelCore.split(File.separator);
+        this.startModuleName = moudules[moudules.length - 1];
+        this.startModulePath = PROJECT_ROOT_PATH + File.separator +
+            START_ROOT_MODULE_NAME + File.separator + startModuleNameInSeaTunnelCore;
+    }
+
+    private static final String FLINK_PROPERTIES = String.join(

Review Comment:
   move static to up



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/ContainerUtil.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public final class ContainerUtil {
+
+    public static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+
+    /**
+     * An error occurs when the user is not a submodule of seatunnel-e2e.
+     */
+    public static final String PROJECT_ROOT_PATH = System.getProperty("user.dir").split("/seatunnel-e2e/")[0];
+
+    public static void copyConnectorJarToContainer(GenericContainer<?> container,
+                                                   String confFile,
+                                                   String connectorsRootPath,
+                                                   String connectorPrefix,
+                                                   String connectorType,
+                                                   String seatunnelHome) {
+        Config jobConfig = getConfig(getConfigFile(confFile));
+        Config connectors = getConfig(new File(PROJECT_ROOT_PATH + File.separator + PLUGIN_MAPPING_FILE)).getConfig(connectorType);
+        Set<String> connectorNames = getConnectors(jobConfig, connectors, "source");
+        connectorNames.addAll(getConnectors(jobConfig, connectors, "sink"));
+        File module = new File(PROJECT_ROOT_PATH + File.separator + connectorsRootPath);
+
+        List<File> connectorFiles = getConnectorFiles(module, connectorNames, connectorPrefix);
+        connectorFiles.forEach(jar ->
+                container.copyFileToContainer(
+                    MountableFile.forHostPath(jar.getAbsolutePath()),
+                    Paths.get(Paths.get(seatunnelHome, "connectors").toString(), connectorType, jar.getName()).toString()));
+    }
+
+    public static String copyConfigFileToContainer(GenericContainer<?> container, String confFile) {
+        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+        container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()), targetConfInContainer);
+        return targetConfInContainer;
+    }
+
+    public static void copySeaTunnelStarter(GenericContainer<?> container,
+                                              String startModuleName,
+                                              String startModulePath,
+                                              String seatunnelHomeInContainer,
+                                              String startShellName) {

Review Comment:
   align indent



##########
seatunnel-e2e/seatunnel-flink-e2e-common/src/test/java/org/apache/seatunnel/e2e/flink/AbstractFlinkContainer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink;
+
+import static org.apache.seatunnel.e2e.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.apache.seatunnel.e2e.ContainerUtil.adaptPathForWin;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConfigFileToContainer;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConnectorJarToContainer;
+import static org.apache.seatunnel.e2e.ContainerUtil.copySeaTunnelStarter;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link AbstractFlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractFlinkContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkContainer.class);
+
+    protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
+
+    protected static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+
+    protected final String dockerImage;
+
+    protected final String startShellName;
+
+    protected final String startModuleName;
+
+    protected final String startModulePath;
+
+    protected final String connectorsRootPath;
+
+    protected final String connectorType;
+
+    protected final String connectorNamePrefix;
+    protected static final Network NETWORK = Network.newNetwork();

Review Comment:
   move static to up



##########
seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java:
##########
@@ -17,128 +17,49 @@
 
 package org.apache.seatunnel.e2e.flink.sql;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConfigFileToContainer;
+
+import org.apache.seatunnel.e2e.flink.AbstractFlinkContainer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
 
 /**
  * This class is the base class of FlinkEnvironment test.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
- * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob(String)} to submit a seatunnel config and run a seatunnel job.
  */
-public abstract class FlinkContainer {
+public abstract class FlinkContainer extends AbstractFlinkContainer {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
 
     private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
-    protected static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
-    private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar";
-    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString();
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();
-
-    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
-    private static final String FLINK_PROPERTIES = String.join(
-        "\n",
-        Arrays.asList(
-            "jobmanager.rpc.address: jobmanager",
-            "taskmanager.numberOfTaskSlots: 10",
-            "parallelism.default: 4",
-            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
 
-    @BeforeEach
-    public void before() {
-        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-            .withCommand("jobmanager")
-            .withNetwork(NETWORK)
-            .withNetworkAliases("jobmanager")
-            .withExposedPorts()
-            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+    private static final String START_SHELL_NAME = "start-seatunnel-sql.sh";
 
-        taskManager =
-            new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("taskmanager")
-                .withNetwork(NETWORK)
-                .withNetworkAliases("taskmanager")
-                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                .dependsOn(jobManager)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    private static final String START_MODULE_NAME = "seatunnel-core-flink-sql";
 
-        Startables.deepStart(Stream.of(jobManager)).join();
-        Startables.deepStart(Stream.of(taskManager)).join();
-        copySeaTunnelFlinkFile();
-        LOG.info("Flink containers are started.");
-    }
-
-    @AfterEach
-    public void close() {
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
-    }
+    private static final String CONNECTORS_ROOT_PATH = "seatunnel-connectors/seatunnel-connectors-flink-sql";
 
-    public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
-        throws IOException, InterruptedException, URISyntaxException {
-        final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
+    private static final String CONNECTOR_TYPE = "seatunnel-sql";
 
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-sql.sh").toString());
-        command.add("--config " + conf);
+    private static final String CONNECTOR_PREFIX = "flink-sql-connector-";
 
-        Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
-        return execResult;
+    public FlinkContainer() {
+        super(FLINK_DOCKER_IMAGE,
+            START_SHELL_NAME,
+            START_MODULE_NAME,
+            CONNECTORS_ROOT_PATH,
+            CONNECTOR_TYPE,
+            CONNECTOR_PREFIX);
     }
 
-    protected void copySeaTunnelFlinkFile() {
-        // copy lib
-        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
-            FLINK_JAR_PATH);
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN).toString());
+    @Override
+    public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {

Review Comment:
   No need to call `copyConnectorJarToContainer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org