You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/19 10:28:14 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2762: [Improve][e2e] Improve end-to-end testing of connector-v2 in a multi-engine environment

hailin0 commented on code in PR #2762:
URL: https://github.com/apache/incubator-seatunnel/pull/2762#discussion_r974077997


##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/TestCaseInvocationContextProvider.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.junit;
+
+import static org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension.TEST_CONTAINERS_STORE_KEY;
+import static org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension.TEST_EXTENDED_FACTORY_STORE_KEY;
+import static org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension.TEST_RESOURCE_NAMESPACE;
+
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TestCaseInvocationContextProvider implements TestTemplateInvocationContextProvider {
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        // Only support test cases with TestContainer as parameter
+        Class<?>[] parameterTypes = context.getRequiredTestMethod().getParameterTypes();
+        return parameterTypes.length == 1 && Arrays.stream(parameterTypes)
+            .anyMatch(TestContainer.class::isAssignableFrom);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context) {
+        List<TestContainer> testContainers = (List<TestContainer>) context.getStore(TEST_RESOURCE_NAMESPACE)
+            .get(TEST_CONTAINERS_STORE_KEY);
+
+        ContainerExtendedFactory containerExtendedFactory = (ContainerExtendedFactory) context.getStore(TEST_RESOURCE_NAMESPACE)
+            .get(TEST_EXTENDED_FACTORY_STORE_KEY);
+
+        int containerAmount = testContainers.size();
+        return testContainers.stream()
+            .map(testContainer -> new TestResourceProvidingInvocationContext(testContainer, containerExtendedFactory, containerAmount));
+    }
+
+    static class TestResourceProvidingInvocationContext implements TestTemplateInvocationContext {
+        private final TestContainer testContainer;
+        private final ContainerExtendedFactory containerExtendedFactory;
+

Review Comment:
   remove?



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link TestContainer#executeJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@NoArgsConstructor
+public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFlinkContainer.class);
+
+    protected static final Network NETWORK = Network.newNetwork();
+
+    protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList(
+        "jobmanager.rpc.address: jobmanager",
+        "taskmanager.numberOfTaskSlots: 10",
+        "parallelism.default: 4",
+        "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
+
+    protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        final String dockerImage = getDockerImage();
+        final String properties = String.join("\n", getFlinkProperties());
+        jobManager = new GenericContainer<>(dockerImage)
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", properties)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(dockerImage)
+                .withCommand("taskmanager")
+                .withNetwork(NETWORK)
+                .withNetworkAliases("taskmanager")
+                .withEnv("FLINK_PROPERTIES", properties)
+                .dependsOn(jobManager)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        copySeaTunnelStarter(jobManager);
+        // execute extra commands
+        executeExtraCommands(jobManager);
+        LOG.info("Flink containers are started.");

Review Comment:
   Use `LOG.info("Flink[{}] container are started", identifier());?`



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link TestContainer#executeJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@NoArgsConstructor
+public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFlinkContainer.class);
+
+    protected static final Network NETWORK = Network.newNetwork();
+
+    protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList(
+        "jobmanager.rpc.address: jobmanager",
+        "taskmanager.numberOfTaskSlots: 10",
+        "parallelism.default: 4",
+        "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
+
+    protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        final String dockerImage = getDockerImage();
+        final String properties = String.join("\n", getFlinkProperties());
+        jobManager = new GenericContainer<>(dockerImage)
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", properties)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(dockerImage)
+                .withCommand("taskmanager")
+                .withNetwork(NETWORK)
+                .withNetworkAliases("taskmanager")
+                .withEnv("FLINK_PROPERTIES", properties)
+                .dependsOn(jobManager)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        copySeaTunnelStarter(jobManager);
+        // execute extra commands
+        executeExtraCommands(jobManager);
+        LOG.info("Flink containers are started.");
+    }
+
+    protected List<String> getFlinkProperties() {
+        return DEFAULT_FLINK_PROPERTIES;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }

Review Comment:
   Use `LOG.info("Flink[{}] container are closed", identifier());?`



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.spark;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+public abstract class AbstractTestSparkContainer extends AbstractTestContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestSparkContainer.class);
+    private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.3";
+    public static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> master;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        master = new GenericContainer<>(getDockerImage())
+            .withNetwork(NETWORK)
+            .withNetworkAliases("spark-master")
+            .withExposedPorts()
+            .withEnv("SPARK_MODE", "master")
+            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
+        // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
+        // start a worker.
+        Startables.deepStart(Stream.of(master)).join();
+        copySeaTunnelStarter(master);
+        // execute extra commands
+        executeExtraCommands(master);
+        LOG.info("Spark container started");

Review Comment:
   Use `LOG.info("Spark[{}] container started", identifier());`?



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/TestSuiteBase.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.apache.seatunnel.e2e.common.container.TestContainersFactory;
+import org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension;
+import org.apache.seatunnel.e2e.common.junit.TestCaseInvocationContextProvider;
+import org.apache.seatunnel.e2e.common.junit.TestContainers;
+import org.apache.seatunnel.e2e.common.junit.TestLoggerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith({
+    ContainerTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase {
+
+    @TestContainers
+    TestContainersFactory containersFactory = ContainerUtil::discoverTestContainers;

Review Comment:
   Use `private ...`?



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.spark;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+public abstract class AbstractTestSparkContainer extends AbstractTestContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestSparkContainer.class);
+    private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.3";
+    public static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> master;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        master = new GenericContainer<>(getDockerImage())
+            .withNetwork(NETWORK)
+            .withNetworkAliases("spark-master")
+            .withExposedPorts()
+            .withEnv("SPARK_MODE", "master")
+            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
+        // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
+        // start a worker.
+        Startables.deepStart(Stream.of(master)).join();
+        copySeaTunnelStarter(master);
+        // execute extra commands
+        executeExtraCommands(master);
+        LOG.info("Spark container started");
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (master != null) {
+            master.stop();
+        }

Review Comment:
   Add `LOG.info("Spark[{}] container closed", identifier());`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org