You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 07:22:19 UTC

[inlong] branch master updated: [INLONG-4763][Sort] Import sort end2end unit test with sql file input (#4765)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7f3c9bd [INLONG-4763][Sort] Import sort end2end unit test with sql file input (#4765)
3c7f3c9bd is described below

commit 3c7f3c9bdf1cc74906fbce567af5a6e21c344ec0
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Jun 30 15:22:13 2022 +0800

    [INLONG-4763][Sort] Import sort end2end unit test with sql file input (#4765)
    
    Co-authored-by: thexia <15...@qq.com>
---
 .licenserc.yaml                                    |   3 +
 inlong-sort/pom.xml                                |   1 +
 .../sort/parser/impl/NativeFlinkSqlParser.java     |   4 +-
 .../sort/parser/result/FlinkSqlParseResult.java    |   6 +
 .../inlong/sort/parser/result/ParseResult.java     |   9 +-
 inlong-sort/sort-end-to-end-tests/pom.xml          | 217 +++++++++++++++
 .../org/apache/inlong/sort/tests/KafkaE2ECase.java | 182 +++++++++++++
 .../sort/tests/utils/FlinkContainerTestEnv.java    | 293 +++++++++++++++++++++
 .../apache/inlong/sort/tests/utils/JdbcProxy.java  | 103 ++++++++
 .../inlong/sort/tests/utils/MySqlContainer.java    | 204 ++++++++++++++
 .../apache/inlong/sort/tests/utils/TestUtils.java  | 109 ++++++++
 .../src/test/resources/docker/mysql/my.cnf         |  63 +++++
 .../src/test/resources/docker/mysql/setup.sql      |  25 ++
 .../src/test/resources/flinkSql/kafka_test.sql     |  75 ++++++
 .../src/test/resources/log4j2-test.properties      |  66 +++++
 licenses/inlong-sort/LICENSE                       |   1 -
 pom.xml                                            |  27 ++
 17 files changed, 1384 insertions(+), 4 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 15e26fa2d..927564f15 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -76,6 +76,9 @@ header:
     # Test case: temporary data for test cases
     - '**/AgentBaseTestsHelper/**'
 
+    # Test case: flink sql comments statement support limit for end2end test cases
+    - '**/resources/flinkSql/**'
+
     # Referenced 3rd codes
     - '**/resources/assets/lib/**'
     - '**/resources/assets/public/**'
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index b70ee9758..4acf211af 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -40,6 +40,7 @@
         <module>sort-connectors</module>
         <module>sort-core</module>
         <module>sort-dist</module>
+        <module>sort-end-to-end-tests</module>
     </modules>
     <properties>
         <debezium.version>1.5.4.Final</debezium.version>
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
index af5401b69..bffd3d1c9 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
@@ -86,8 +86,8 @@ public class NativeFlinkSqlParser implements Parser {
                 createTableSqls.add(statement);
             } else if (statement.toUpperCase(Locale.ROOT).startsWith("INSERT INTO")) {
                 insertSqls.add(statement);
-            } else {
-                throw new IllegalArgumentException("not support sql: " + statement);
+            } else if (!statement.isEmpty()) {
+                log.warn("Not support sql statement: " + statement);
             }
         }
         return new FlinkSqlParseResult(tableEnv, createTableSqls, insertSqls);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/FlinkSqlParseResult.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/FlinkSqlParseResult.java
index 917378e7e..ca25126b1 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/FlinkSqlParseResult.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/FlinkSqlParseResult.java
@@ -57,6 +57,12 @@ public class FlinkSqlParseResult implements ParseResult, Serializable {
 
     @Override
     public void execute() throws Exception {
+        executeCreateTableSqls(createTableSqls);
+        executeLoadSqls(loadSqls);
+    }
+
+    @Override
+    public void waitExecute() throws Exception {
         executeCreateTableSqls(createTableSqls);
         executeLoadSqls(loadSqls).await();
     }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/ParseResult.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/ParseResult.java
index c8157f1c1..dbde7b1f5 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/ParseResult.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/result/ParseResult.java
@@ -23,12 +23,19 @@ package org.apache.inlong.sort.parser.result;
 public interface ParseResult {
 
     /**
-     * Execute the parse result
+     * Execute the parse result without waiting
      *
      * @throws Exception The exception may throws when executing
      */
     void execute() throws Exception;
 
+    /**
+     * Execute the parse result and wait result unit data is ready
+     *
+     * @throws Exception The exception may throws when executing
+     */
+    void waitExecute() throws Exception;
+
     /**
      * Try to execute, it mostly for unit test and syntax error checking
      *
diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml b/inlong-sort/sort-end-to-end-tests/pom.xml
new file mode 100644
index 000000000..8c0300283
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>inlong-sort</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-end-to-end-tests</artifactId>
+    <name>Apache InLong - Sort End to End Tests</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>jdbc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-dist</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-jars</id>
+                        <phase>pre-integration-test</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <artifactItems>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-dist</artifactId>
+                            <version>${project.version}</version>
+                            <destFileName>sort-dist.jar</destFileName>
+                            <type>jar</type>
+                            <outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+                        <dependency>
+                            <groupId>mysql</groupId>
+                            <artifactId>mysql-connector-java</artifactId>
+                            <version>${mysql.jdbc.version}</version>
+                            <destFileName>mysql-driver.jar</destFileName>
+                            <type>jar</type>
+                            <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </dependency>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-kafka</artifactId>
+                            <version>${project.version}</version>
+                            <destFileName>sort-connector-kafka.jar</destFileName>
+                            <type>jar</type>
+                            <outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-mysql-cdc</artifactId>
+                            <version>${project.version}</version>
+                            <destFileName>sort-connector-mysql-cdc.jar</destFileName>
+                            <type>jar</type>
+                            <outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-jdbc</artifactId>
+                            <version>${project.version}</version>
+                            <destFileName>sort-connector-jdbc.jar</destFileName>
+                            <type>jar</type>
+                            <outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+                    </artifactItems>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>end-to-end-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <includes>
+                                <include>**/*.*</include>
+                            </includes>
+                            <forkCount>1</forkCount>
+                            <systemPropertyVariables>
+                                <moduleDir>${project.basedir}</moduleDir>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
new file mode 100644
index 000000000..2f9b2128f
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
@@ -0,0 +1,182 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
+import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * End-to-end tests for sort-connector-kafka uber jar.
+ */
+public class KafkaE2ECase extends FlinkContainerTestEnv {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaE2ECase.class);
+
+    private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar");
+    private static final Path jdbcJar = TestUtils.getResource("sort-connector-jdbc.jar");
+    private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
+    private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
+    // Can't use getResource("xxx").getPath(), windows will don't know that path
+    private static final String sqlFile;
+
+    static {
+        try {
+            sqlFile = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/kafka_test.sql").toURI()).toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final String TOPIC = "test-topic";
+
+    @ClassRule
+    public static final KafkaContainer KAFKA =
+            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("kafka")
+                    .withEmbeddedZookeeper()
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    @Before
+    public void setup() {
+        initializeMysqlTable();
+        initializeKafkaTable();
+    }
+
+    @After
+    public void teardown() {
+        if (KAFKA != null) {
+            KAFKA.stop();
+        }
+    }
+
+    private void initializeKafkaTable() {
+        List<String> commands = new ArrayList<>();
+        commands.add("kafka-topics");
+        commands.add("--create");
+        commands.add("--topic");
+        commands.add(TOPIC);
+        commands.add("--replication-factor 1");
+        commands.add("--partitions 1");
+        commands.add("--zookeeper");
+        commands.add("localhost:" + KafkaContainer.ZOOKEEPER_PORT);
+        try {
+            LOG.info(String.join(" ", commands));
+            ExecResult result = KAFKA.execInContainer("bash", "-c", String.join(" ", commands));
+            LOG.info(result.getStdout());
+            if (result.getExitCode() != 0) {
+                throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode());
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initializeMysqlTable() {
+        try (Connection conn =
+                DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "CREATE TABLE test_input (\n"
+                            + "  id INTEGER NOT NULL,\n"
+                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+                            + "  description VARCHAR(512),\n"
+                            + "  weight FLOAT,\n"
+                            + "  enum_c enum('red', 'white') default 'red',  -- test some complex types as well,\n"
+                            + "  json_c JSON, -- because we use additional dependencies to deserialize complex types.\n"
+                            + "  point_c POINT\n"
+                            + ");");
+            stat.execute(
+                    "CREATE TABLE test_output (\n"
+                            + "  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
+                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+                            + "  description VARCHAR(512),\n"
+                            + "  weight FLOAT,\n"
+                            + "  enum_c VARCHAR(255),\n"
+                            + "  json_c VARCHAR(255),\n"
+                            + "  point_c VARCHAR(255)\n"
+                            + ");");
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    /**
+     * Test flink sql mysql cdc to hive
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testKafka() throws Exception {
+        submitSQLJob(sqlFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+
+        // generate input
+        try (Connection conn =
+                DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "INSERT INTO test_input "
+                            + "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);");
+            stat.execute(
+                    "INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        // validate output
+        JdbcProxy proxy =
+                new JdbcProxy(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword(), MYSQL_DRIVER_CLASS);
+        List<String> expectResult =
+                Arrays.asList(
+                        "1,jacket,water resistent white wind breaker,0.2,,,",
+                        "2,scooter,Big 2-wheel scooter ,5.18,,,");
+        proxy.checkResultWithTimeout(
+                expectResult,
+                "test_output",
+                7,
+                60000L);
+    }
+
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
new file mode 100644
index 000000000..b7eba6c4c
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -0,0 +1,293 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.lifecycle.Startables;
+import sun.misc.IOUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * End to end base test environment for test sort-connectors.
+ * Every link : MySQL -> Xxx (Test connector) -> MySQL
+ */
+public abstract class FlinkContainerTestEnv extends TestLogger {
+    private static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class);
+    private static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class);
+    private static final Logger MYSQL_LOG = LoggerFactory.getLogger(MySqlContainer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class);
+
+    private static final Path SORT_DIST_JAR = TestUtils.getResource("sort-dist.jar");
+    // ------------------------------------------------------------------------------------------
+    // Flink Variables
+    // ------------------------------------------------------------------------------------------
+    private static final int JOB_MANAGER_REST_PORT = 8081;
+    private static final int DEBUG_PORT = 20000;
+    private static final String FLINK_BIN = "bin";
+    private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+    private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+    private static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList(
+            "jobmanager.rpc.address: jobmanager",
+            "taskmanager.numberOfTaskSlots: 10",
+            "parallelism.default: 4",
+            "env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=20000",
+            "env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=20000",
+            // this is needed for oracle-cdc tests.
+            // see https://stackoverflow.com/a/47062742/4915129
+            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+    @ClassRule
+    public static final Network NETWORK = Network.newNetwork();
+
+    @Rule
+    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Nullable
+    private RestClusterClient<StandaloneClusterId> restClusterClient;
+
+    private GenericContainer<?> jobManager;
+    private GenericContainer<?> taskManager;
+
+
+    // ----------------------------------------------------------------------------------------
+    // MYSQL Variables
+    // ----------------------------------------------------------------------------------------
+    protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer) new MySqlContainer()
+                    .withConfigurationOverride("docker/mysql/my.cnf")
+                    .withSetupSQL("docker/mysql/setup.sql")
+                    .withDatabaseName("test")
+                    .withUsername("flinkuser")
+                    .withPassword("flinkpw")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
+
+    @Before
+    public void before() {
+        LOG.info("Starting containers...");
+        jobManager =
+                new GenericContainer<>("flink:1.13.5-scala_2.11")
+                        .withCommand("jobmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withLogConsumer(new Slf4jLogConsumer(JM_LOG));
+        taskManager =
+                new GenericContainer<>("flink:1.13.5-scala_2.11")
+                        .withCommand("taskmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+                        .withExposedPorts(DEBUG_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .dependsOn(jobManager)
+                        .withLogConsumer(new Slf4jLogConsumer(TM_LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @After
+    public void after() {
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+        if (MYSQL != null) {
+            MYSQL.stop();
+        }
+    }
+
+    /**
+     * Submits a SQL job to the running cluster.
+     *
+     * <p><b>NOTE:</b> You should not use {@code '\t'}.
+     */
+    public void submitSQLJob(String sqlFile, Path... jars)
+            throws IOException, InterruptedException {
+        final List<String> commands = new ArrayList<>();
+        String containerSqlFile = copyToContainerTmpPath(jobManager, sqlFile);
+        commands.add(FLINK_BIN + "/flink run -d");
+        commands.add("-c org.apache.inlong.sort.Entrance");
+        commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars)));
+        commands.add("--sql.script.file");
+        commands.add(containerSqlFile);
+
+        ExecResult execResult =
+                jobManager.execInContainer("bash", "-c", String.join(" ", commands));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        if (execResult.getExitCode() != 0) {
+            throw new AssertionError("Failed when submitting the SQL job.");
+        }
+    }
+
+    /**
+     * Get {@link RestClusterClient} connected to this FlinkContainer.
+     *
+     * <p>This method lazily initializes the REST client on-demand.
+     */
+    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
+        if (restClusterClient != null) {
+            return restClusterClient;
+        }
+        checkState(
+                jobManager.isRunning(),
+                "Cluster client should only be retrieved for a running cluster");
+        try {
+            final Configuration clientConfiguration = new Configuration();
+            clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+            clientConfiguration.set(
+                    RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
+            this.restClusterClient =
+                    new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to create client for Flink container cluster", e);
+        }
+        return restClusterClient;
+    }
+
+    /**
+     * Polling to detect task status until the task successfully into {@link JobStatus.RUNNING}
+     * @param timeout
+     */
+    public void waitUntilJobRunning(Duration timeout) {
+        RestClusterClient<?> clusterClient = getRestClusterClient();
+        Deadline deadline = Deadline.fromNow(timeout);
+        while (deadline.hasTimeLeft()) {
+            Collection<JobStatusMessage> jobStatusMessages;
+            try {
+                jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("Error when fetching job status.", e);
+                continue;
+            }
+            if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+                JobStatusMessage message = jobStatusMessages.iterator().next();
+                JobStatus jobStatus = message.getJobState();
+                if (jobStatus.isTerminalState()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Job has been terminated! JobName: %s, JobID: %s, Status: %s",
+                                    message.getJobName(),
+                                    message.getJobId(),
+                                    message.getJobState()));
+                } else if (jobStatus == JobStatus.RUNNING) {
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
+     * Copy all other dependencies into user jar 'lib/' entry.
+     * Flink per-job mode only support upload one jar to cluster.
+     */
+    private String constructDistJar(Path... jars) throws IOException {
+
+        File newJar = temporaryFolder.newFile("sort-dist.jar");
+        try (
+                JarFile jarFile = new JarFile(SORT_DIST_JAR.toFile());
+                JarOutputStream jos = new JarOutputStream(new FileOutputStream(newJar))
+            ) {
+            jarFile.stream().forEach(entry -> {
+                try (InputStream is = jarFile.getInputStream(entry)) {
+                    jos.putNextEntry(entry);
+                    jos.write(IOUtils.readNBytes(is, is.available()));
+                    jos.closeEntry();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            for (Path jar : jars) {
+                try (InputStream is = new FileInputStream(jar.toFile())) {
+                    jos.putNextEntry(new JarEntry("lib/" + jar.getFileName().toString()));
+                    jos.write(IOUtils.readNBytes(is, is.available()));
+                    jos.closeEntry();
+                }
+            }
+
+        }
+        return newJar.getAbsolutePath();
+    }
+
+    // Should not a big file, all file data will load into memory, then copy to container.
+    private String copyToContainerTmpPath(GenericContainer<?> container, String filePath) throws IOException {
+        Path path = Paths.get(filePath);
+        byte[] fileData = Files.readAllBytes(path);
+            String containerPath = "/tmp/" + path.getFileName();
+        container.copyFileToContainer(Transferable.of(fileData), containerPath);
+        return containerPath;
+    }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java
new file mode 100644
index 000000000..d9458b591
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java
@@ -0,0 +1,103 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Proxy to communicate with database using JDBC protocol.
+ */
+public class JdbcProxy {
+
+    private final String url;
+    private final String userName;
+    private final String password;
+    private final String driverClass;
+
+    public JdbcProxy(String url, String userName, String password, String driverClass) {
+        this.url = url;
+        this.userName = userName;
+        this.password = password;
+        this.driverClass = driverClass;
+    }
+
+    /**
+     * Compare db select result and expected result in one time.
+     * @param expectedResult
+     * @param table
+     * @param fieldsLen
+     * @throws SQLException
+     * @throws ClassNotFoundException
+     */
+    public void checkResult(List<String> expectedResult, String table, int fieldsLen)
+            throws SQLException, ClassNotFoundException {
+        Class.forName(driverClass);
+        try (Connection dbConn = DriverManager.getConnection(url, userName, password);
+                PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
+                ResultSet resultSet = statement.executeQuery()) {
+            List<String> results = new ArrayList<>();
+            while (resultSet.next()) {
+                List<String> result = new ArrayList<>();
+                for (int fieldIndex = 1; fieldIndex <= fieldsLen; fieldIndex++) {
+                    Object value = resultSet.getObject(fieldIndex);
+                    if (value == null) {
+                        result.add("null");
+                    } else {
+                        result.add(value.toString());
+                    }
+                }
+
+                results.add(StringUtils.join(result, ","));
+            }
+            Collections.sort(results);
+            Collections.sort(expectedResult);
+            assertEquals(expectedResult, results);
+        }
+    }
+
+    public void checkResultWithTimeout(
+            List<String> expectedResult, String table, int fieldsLen, long timeout)
+            throws Exception {
+        long endTimeout = System.currentTimeMillis() + timeout;
+        boolean result = false;
+        while (System.currentTimeMillis() < endTimeout) {
+            try {
+                checkResult(expectedResult, table, fieldsLen);
+                result = true;
+                break;
+            } catch (AssertionError | SQLException throwable) {
+                Thread.sleep(1000L);
+            }
+        }
+        if (!result) {
+            checkResult(expectedResult, table, fieldsLen);
+        }
+    }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
new file mode 100644
index 000000000..a2d13ac4d
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
@@ -0,0 +1,204 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Docker container for MySQL. The difference between this class and {@link
+ * org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has problems when
+ * overriding mysql conf file, i.e. my.cnf.
+ */
+@SuppressWarnings("rawtypes")
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+    public static final String IMAGE = "mysql";
+    public static final Integer MYSQL_PORT = 3306;
+
+    private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+    private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+    private static final String MYSQL_ROOT_USER = "root";
+
+    private String databaseName = "test";
+    private String username = "inlong";
+    private String password = "inlong";
+
+    public MySqlContainer() {
+        this(MySqlVersion.V5_7);
+    }
+
+    public MySqlContainer(MySqlVersion version) {
+        super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
+        addExposedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected Set<Integer> getLivenessCheckPorts() {
+        return new HashSet<>(getMappedPort(MYSQL_PORT));
+    }
+
+    @Override
+    protected void configure() {
+        // HERE is the difference, copy to /etc/mysql/, if copy to /etc/mysql/conf.d will be wrong
+        optionallyMapResourceParameterAsVolume(
+                MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");
+
+        if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+            optionallyMapResourceParameterAsVolume(
+                    SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
+        }
+
+        addEnv("MYSQL_DATABASE", databaseName);
+        addEnv("MYSQL_USER", username);
+        if (password != null && !password.isEmpty()) {
+            addEnv("MYSQL_PASSWORD", password);
+            addEnv("MYSQL_ROOT_PASSWORD", password);
+        } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+            addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+        } else {
+            throw new ContainerLaunchException(
+                    "Empty password can be used only with the root user");
+        }
+        setStartupAttempts(3);
+    }
+
+    @Override
+    public String getDriverClassName() {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            return "com.mysql.cj.jdbc.Driver";
+        } catch (ClassNotFoundException e) {
+            return "com.mysql.jdbc.Driver";
+        }
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        String additionalUrlParams = constructUrlParameters("?", "&");
+        return "jdbc:mysql://"
+                + getHost()
+                + ":"
+                + getDatabasePort()
+                + "/"
+                + databaseName
+                + additionalUrlParams;
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl(databaseName);
+    }
+
+    public int getDatabasePort() {
+        return getMappedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected String constructUrlForConnection(String queryString) {
+        String url = super.constructUrlForConnection(queryString);
+
+        if (!url.contains("useSSL=")) {
+            String separator = url.contains("?") ? "&" : "?";
+            url = url + separator + "useSSL=false";
+        }
+
+        if (!url.contains("allowPublicKeyRetrieval=")) {
+            url = url + "&allowPublicKeyRetrieval=true";
+        }
+
+        return url;
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withConfigurationOverride(String s) {
+        parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withSetupSQL(String sqlPath) {
+        parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withDatabaseName(final String databaseName) {
+        this.databaseName = databaseName;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withUsername(final String username) {
+        this.username = username;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withPassword(final String password) {
+        this.password = password;
+        return this;
+    }
+
+    /** MySql version enum. */
+    public enum MySqlVersion {
+        V5_5("5.5"),
+        V5_6("5.6"),
+        V5_7("5.7"),
+        V8_0("8.0");
+
+        private String version;
+
+        MySqlVersion(String version) {
+            this.version = version;
+        }
+
+        public String getVersion() {
+            return version;
+        }
+
+        @Override
+        public String toString() {
+            return "MySqlVersion{" + "version='" + version + '\'' + '}';
+        }
+    }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
new file mode 100644
index 000000000..beeb5edd9
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
@@ -0,0 +1,109 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test util for test container.
+ */
+public class TestUtils {
+    private static final ParameterProperty<Path> MODULE_DIRECTORY =
+            new ParameterProperty<>("moduleDir", Paths::get);
+
+    /**
+     * Searches for a resource file matching the given regex in the given directory. This method is
+     * primarily intended to be used for the initialization of static {@link Path} fields for
+     * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
+     *
+     * @param resourceNameRegex regex pattern to match against
+     * @return Path pointing to the matching jar
+     * @throws RuntimeException if none or multiple resource files could be found
+     */
+    public static Path getResource(final String resourceNameRegex) {
+        // if the property is not set then we are most likely running in the IDE, where the working
+        // directory is the
+        // module of the test that is currently running, which is exactly what we want
+        Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+
+        try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
+            final List<Path> matchingResources =
+                    dependencyResources
+                            .filter(
+                                    jar ->
+                                            Pattern.compile(resourceNameRegex)
+                                                    .matcher(jar.toAbsolutePath().toString())
+                                                    .find())
+                            .collect(Collectors.toList());
+            switch (matchingResources.size()) {
+                case 0:
+                    throw new RuntimeException(
+                            new FileNotFoundException(
+                                    String.format(
+                                            "No resource file could be found that matches the pattern %s. "
+                                                    + "This could mean that the test module must be rebuilt via maven.",
+                                            resourceNameRegex)));
+                case 1:
+                    return matchingResources.get(0);
+                default:
+                    throw new RuntimeException(
+                            new IOException(
+                                    String.format(
+                                            "Multiple resource files were found matching the pattern %s. Matches=%s",
+                                            resourceNameRegex, matchingResources)));
+            }
+        } catch (final IOException ioe) {
+            throw new RuntimeException("Could not search for resource resource files.", ioe);
+        }
+    }
+
+    /**
+     * A simple system properties value getter with default value when could not find the system property.
+     * @param <V>
+     */
+    static class ParameterProperty<V> {
+
+        private final String propertyName;
+        private final Function<String, V> converter;
+
+        public ParameterProperty(final String propertyName, final Function<String, V> converter) {
+            this.propertyName = propertyName;
+            this.converter = converter;
+        }
+
+        /**
+         * Retrieves the value of this property, or the given default if no value was set.
+         *
+         * @return the value of this property, or the given default if no value was set
+         */
+        public V get(final V defaultValue) {
+            final String value = System.getProperty(propertyName);
+            return value == null ? defaultValue : converter.apply(value);
+        }
+    }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf b/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf
new file mode 100644
index 000000000..87a492c49
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+#secure-file-priv=/var/lib/mysql-files
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but would
+# be longer on a production system. Row-level info is required for ingest to work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql
new file mode 100644
index 000000000..9ec4b48bb
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql
@@ -0,0 +1,25 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--   http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
+-- 2) 'inlong' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES  ON *.* TO 'flinkuser'@'%';
+CREATE USER 'inlong' IDENTIFIED BY 'inlong';
+GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%';
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/kafka_test.sql b/inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/kafka_test.sql
new file mode 100644
index 000000000..9ab70bb67
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/kafka_test.sql
@@ -0,0 +1,75 @@
+CREATE TABLE test_input (
+    `id` INT NOT NULL,
+    name STRING,
+    description STRING,
+    weight DECIMAL(10,3),
+    enum_c STRING,
+    json_c STRING,
+    point_c STRING
+) WITH (
+    'connector' = 'mysql-cdc-inlong',
+    'hostname' = 'mysql',
+    'port' = '3306',
+    'username' = 'inlong',
+    'password' = 'inlong',
+    'database-name' = 'test',
+    'table-name' = 'test_input',
+    'append-mode' = 'true',
+    'scan.incremental.snapshot.chunk.size' = '4',
+    'scan.incremental.snapshot.enabled' = 'false'
+);
+
+CREATE TABLE kafka_load (
+    `id` INT NOT NULL,
+    name STRING,
+    description STRING,
+    weight DECIMAL(10,3),
+    enum_c STRING,
+    json_c STRING,
+    point_c STRING
+) WITH (
+    'connector' = 'kafka-inlong',
+    'topic' = 'test-topic',
+    'properties.bootstrap.servers' = 'kafka:9092',
+    'format' = 'csv'
+);
+
+CREATE TABLE kafka_extract (
+    `id` INT NOT NULL,
+    name STRING,
+    description STRING,
+    weight DECIMAL(10,3),
+    enum_c STRING,
+    json_c STRING,
+    point_c STRING
+) WITH (
+    'connector' = 'kafka-inlong',
+    'topic' = 'test-topic',
+    'properties.bootstrap.servers' = 'kafka:9092',
+    'properties.group.id' = 'testGroup',
+    'scan.startup.mode' = 'earliest-offset',
+    'format' = 'csv'
+);
+
+CREATE TABLE test_output (
+    `id` INT NOT NULL,
+    name STRING,
+    description STRING,
+    weight DECIMAL(10,3),
+    enum_c STRING,
+    json_c STRING,
+    point_c STRING
+) WITH (
+    'connector' = 'jdbc-inlong',
+    'url' = 'jdbc:mysql://mysql:3306/test',
+    'table-name' = 'test_output',
+    'username' = 'inlong',
+    'password' = 'inlong'
+);
+
+INSERT INTO kafka_load select * from test_input;
+INSERT INTO test_output select * from kafka_extract;
+
+
+
+
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..9f14c92cb
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+rootLogger=INFO, STDOUT
+
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
+
+appender.jm.type = File
+appender.jm.name = jobmanager
+appender.jm.fileName = target/logs/jobmanager.log
+appender.jm.layout.type = PatternLayout
+appender.jm.layout.pattern = - %m%n
+
+appender.tm.type = File
+appender.tm.name = taskmanager
+appender.tm.fileName = target/logs/taskmanager.log
+appender.tm.layout.type = PatternLayout
+appender.tm.layout.pattern = - %m%n
+
+appender.kafka.type = File
+appender.kafka.name = kafkaserver
+appender.kafka.fileName = target/logs/kafka.log
+appender.kafka.layout.type = PatternLayout
+appender.kafka.layout.pattern = - %m%n
+
+appender.mysql.type = File
+appender.mysql.name = mysql
+appender.mysql.fileName = target/logs/mysql.log
+appender.mysql.layout.type = PatternLayout
+appender.mysql.layout.pattern = - %m%n
+
+
+logger.jm=INFO, jobmanager
+logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster
+logger.jm.additivity=false
+
+logger.tm=INFO, taskmanager
+logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor
+logger.tm.additivity=false
+
+logger.mysql=INFO, mysql
+logger.mysql.name=org.apache.inlong.sort.tests.utils.MySqlContainer
+logger.mysql.additivity=false
+
+logger.kafka=INFO, kafkaserver
+logger.kafka.name=org.apache.inlong.sort.tests.KafkaE2ECase
+logger.kafka.additivity=false
+
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 432bbe8dc..2988d52aa 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -548,4 +548,3 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.checkerframework:checker-qual:3.12.0 - Checker Qual (https://checkerframework.org), (The MIT License)
   org.projectlombok:lombok:1.18.22 - Lombok (https://projectlombok.org), (The MIT License)
   org.slf4j:slf4j-api:1.7.36 - SLF4J API Module (http://www.slf4j.org), (MIT License)
-
diff --git a/pom.xml b/pom.xml
index b99707ead..552d51100 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
 
         <checkstyle.version>6.19</checkstyle.version>
         <dockerfile.maven.version>1.4.13</dockerfile.maven.version>
+        <testcontainers.version>1.17.2</testcontainers.version>
         <docker.organization>inlong</docker.organization>
 
         <netty.version>4.1.72.Final</netty.version>
@@ -1451,6 +1452,30 @@
                 <scope>test</scope>
                 <version>${awaitility.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>mysql</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>jdbc</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>kafka</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
 
             <dependency>
                 <groupId>org.hamcrest</groupId>
@@ -1669,6 +1694,8 @@
                         <exclude>
                             **/manager-service/src/test/resources/plugins/manager-plugin-examples.jar
                         </exclude>
+                        <!-- Test case: flink sql comments statement support limit for end2end test cases -->
+                        <exclude>**/resources/flinkSql/**</exclude>
 
                         <!-- Referenced 3rd codes -->
                         <exclude>**/resources/assets/lib/**</exclude>