You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/08 13:13:26 UTC

[incubator-seatunnel] branch dev updated: Add flink sql e2e module (#1828)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4cc20a4b Add flink sql e2e module (#1828)
4cc20a4b is described below

commit 4cc20a4beffd3824baeb2a0e25a79120cc982358
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun May 8 21:13:21 2022 +0800

    Add flink sql e2e module (#1828)
---
 seatunnel-e2e/pom.xml                              |   1 +
 seatunnel-e2e/seatunnel-flink-e2e/pom.xml          |   2 +-
 .../pom.xml                                        |  15 +--
 .../seatunnel/e2e/flink/sql/FlinkContainer.java    | 136 +++++++++++++++++++++
 .../e2e/flink/sql/fake/DatagenToConsoleIT.java     |  37 ++++++
 .../src/test/resources/fake/flink.sql.conf         |  50 ++++++++
 .../src/test/resources/log4j.properties            |  23 ++++
 seatunnel-e2e/seatunnel-spark-e2e/pom.xml          |   2 +-
 .../src/main/resources/log4j.properties            |  22 ++++
 .../src/main/resources/log4j.properties            |  22 ++++
 10 files changed, 301 insertions(+), 9 deletions(-)

diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index 5c37f80a..b05d304d 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -29,6 +29,7 @@
     <modules>
         <module>seatunnel-flink-e2e</module>
         <module>seatunnel-spark-e2e</module>
+        <module>seatunnel-flink-sql-e2e</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
index 5f44ffb7..04b992b9 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -36,7 +36,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
+            <version>${slf4j.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
similarity index 92%
copy from seatunnel-e2e/seatunnel-spark-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
index 66da0231..284c138e 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
@@ -23,21 +23,22 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-spark-e2e</artifactId>
-    <packaging>jar</packaging>
+    <artifactId>seatunnel-flink-sql-e2e</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-core-flink-sql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>1.7.25</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-spark</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
new file mode 100644
index 00000000..8453515d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.sql;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job.
+ */
+public abstract class FlinkContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
+
+    private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+    protected static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar";
+    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();
+
+    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
+
+    private static final String FLINK_PROPERTIES = String.join(
+        "\n",
+        Arrays.asList(
+            "jobmanager.rpc.address: jobmanager",
+            "taskmanager.numberOfTaskSlots: 10",
+            "parallelism.default: 4",
+            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+    @Before
+    public void before() {
+        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(FLINK_DOCKER_IMAGE)
+                .withCommand("taskmanager")
+                .withNetwork(NETWORK)
+                .withNetworkAliases("taskmanager")
+                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                .dependsOn(jobManager)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        copySeaTunnelFlinkFile();
+        LOG.info("Flink containers are started.");
+    }
+
+    @After
+    public void close() {
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+    }
+
+    public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
+        throws IOException, InterruptedException, URISyntaxException {
+        final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
+        if (!new File(confPath).exists()) {
+            throw new IllegalArgumentException(confFile + " doesn't exist");
+        }
+        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
+
+        // Running IT use cases under Windows requires replacing \ with /
+        String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
+        String conf = targetConfInContainer.replaceAll("\\\\", "/");
+        final List<String> command = new ArrayList<>();
+        command.add("flink");
+        command.add("run");
+        command.add("-c org.apache.seatunnel.core.sql.SeatunnelSql " + jar);
+        command.add("--config " + conf);
+
+        Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        // wait job start
+        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
+        return execResult;
+    }
+
+    protected void copySeaTunnelFlinkFile() {
+        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
+        jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath), FLINK_JAR_PATH);
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
new file mode 100644
index 00000000..a82a6098
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.sql.fake;
+
+import org.apache.seatunnel.e2e.flink.sql.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class DatagenToConsoleIT extends FlinkContainer {
+
+    @Test
+    public void testDatagenToConsole() throws IOException, URISyntaxException, InterruptedException {
+        final String configFile = "/fake/flink.sql.conf";
+        Container.ExecResult execResult = executeSeaTunnelFlinkSqlJob(configFile);
+        Assert.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf
new file mode 100644
index 00000000..a604c2d5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf
@@ -0,0 +1,50 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
+-- This config file is a demonstration of sql processing in SeaTunnel config
+--
+--
+
+
+SET 'table.dml-sync' = 'true';
+
+CREATE TABLE events (
+  f_type INT,
+  f_uid INT,
+  ts AS localtimestamp,
+  WATERMARK FOR ts AS ts
+) WITH (
+  'connector' = 'datagen',
+  'rows-per-second'='5',
+  'fields.f_type.min'='1',
+  'fields.f_type.max'='5',
+  'fields.f_uid.kind'='sequence',
+  'fields.f_uid.start'='1',
+  'fields.f_uid.end'='100'
+);
+
+CREATE TABLE print_table (
+  type INT,
+  uid INT,
+  lstmt TIMESTAMP
+) WITH (
+  'connector' = 'print',
+  'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events;
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000..57b61a3c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=ERROR, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index 66da0231..a613ca93 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -30,7 +30,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
+            <version>${slf4j.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties
new file mode 100644
index 00000000..db5d9e51
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties
new file mode 100644
index 00000000..db5d9e51
--- /dev/null
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n