You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/08 13:13:26 UTC
[incubator-seatunnel] branch dev updated: Add flink sql e2e module (#1828)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4cc20a4b Add flink sql e2e module (#1828)
4cc20a4b is described below
commit 4cc20a4beffd3824baeb2a0e25a79120cc982358
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun May 8 21:13:21 2022 +0800
Add flink sql e2e module (#1828)
---
seatunnel-e2e/pom.xml | 1 +
seatunnel-e2e/seatunnel-flink-e2e/pom.xml | 2 +-
.../pom.xml | 15 +--
.../seatunnel/e2e/flink/sql/FlinkContainer.java | 136 +++++++++++++++++++++
.../e2e/flink/sql/fake/DatagenToConsoleIT.java | 37 ++++++
.../src/test/resources/fake/flink.sql.conf | 50 ++++++++
.../src/test/resources/log4j.properties | 23 ++++
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 2 +-
.../src/main/resources/log4j.properties | 22 ++++
.../src/main/resources/log4j.properties | 22 ++++
10 files changed, 301 insertions(+), 9 deletions(-)
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index 5c37f80a..b05d304d 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -29,6 +29,7 @@
<modules>
<module>seatunnel-flink-e2e</module>
<module>seatunnel-spark-e2e</module>
+ <module>seatunnel-flink-sql-e2e</module>
</modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
index 5f44ffb7..04b992b9 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -36,7 +36,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
+ <version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
similarity index 92%
copy from seatunnel-e2e/seatunnel-spark-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
index 66da0231..284c138e 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
@@ -23,21 +23,22 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-spark-e2e</artifactId>
- <packaging>jar</packaging>
+ <artifactId>seatunnel-flink-sql-e2e</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core-flink-sql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-spark</artifactId>
- <version>${project.version}</version>
- </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
new file mode 100644
index 00000000..8453515d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.sql;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job.
+ */
+public abstract class FlinkContainer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
+
+ private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+ protected static final Network NETWORK = Network.newNetwork();
+
+ protected GenericContainer<?> jobManager;
+ protected GenericContainer<?> taskManager;
+ private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
+ private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar";
+ private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+ private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();
+
+ private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
+
+ private static final String FLINK_PROPERTIES = String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+ @Before
+ public void before() {
+ jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jobmanager")
+ .withExposedPorts()
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ taskManager =
+ new GenericContainer<>(FLINK_DOCKER_IMAGE)
+ .withCommand("taskmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("taskmanager")
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .dependsOn(jobManager)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ copySeaTunnelFlinkFile();
+ LOG.info("Flink containers are started.");
+ }
+
+ @After
+ public void close() {
+ if (taskManager != null) {
+ taskManager.stop();
+ }
+ if (jobManager != null) {
+ jobManager.stop();
+ }
+ }
+
+ public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
+ throws IOException, InterruptedException, URISyntaxException {
+ final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
+ if (!new File(confPath).exists()) {
+ throw new IllegalArgumentException(confFile + " doesn't exist");
+ }
+ final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+ jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
+
+ // Running IT use cases under Windows requires replacing \ with /
+ String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
+ String conf = targetConfInContainer.replaceAll("\\\\", "/");
+ final List<String> command = new ArrayList<>();
+ command.add("flink");
+ command.add("run");
+ command.add("-c org.apache.seatunnel.core.sql.SeatunnelSql " + jar);
+ command.add("--config " + conf);
+
+ Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ // wait job start
+ Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
+ return execResult;
+ }
+
+ protected void copySeaTunnelFlinkFile() {
+ String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
+ jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath), FLINK_JAR_PATH);
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
new file mode 100644
index 00000000..a82a6098
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.sql.fake;
+
+import org.apache.seatunnel.e2e.flink.sql.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class DatagenToConsoleIT extends FlinkContainer {
+
+ @Test
+ public void testDatagenToConsole() throws IOException, URISyntaxException, InterruptedException {
+ final String configFile = "/fake/flink.sql.conf";
+ Container.ExecResult execResult = executeSeaTunnelFlinkSqlJob(configFile);
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf
new file mode 100644
index 00000000..a604c2d5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/fake/flink.sql.conf
@@ -0,0 +1,50 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
+-- This config file is a demonstration of sql processing in SeaTunnel config
+--
+--
+
+
+SET 'table.dml-sync' = 'true';
+
+CREATE TABLE events (
+ f_type INT,
+ f_uid INT,
+ ts AS localtimestamp,
+ WATERMARK FOR ts AS ts
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second'='5',
+ 'fields.f_type.min'='1',
+ 'fields.f_type.max'='5',
+ 'fields.f_uid.kind'='sequence',
+ 'fields.f_uid.start'='1',
+ 'fields.f_uid.end'='100'
+);
+
+CREATE TABLE print_table (
+ type INT,
+ uid INT,
+ lstmt TIMESTAMP
+) WITH (
+ 'connector' = 'print',
+ 'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events;
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000..57b61a3c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=ERROR, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index 66da0231..a613ca93 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -30,7 +30,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
+ <version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties
new file mode 100644
index 00000000..db5d9e51
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties
new file mode 100644
index 00000000..db5d9e51
--- /dev/null
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n