You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/23 06:19:51 UTC

[inlong] branch master updated: [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99189beda [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
99189beda is described below

commit 99189bedacca354e5244be251235a32ad8ccf1e2
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:19:44 2022 +0800

    [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../org/apache/inlong/sort/tests/KafkaE2ECase.java | 175 ++++---
 .../sort/tests/utils/FlinkContainerTestEnv.java    |  38 +-
 .../sort/tests/utils/PlaceholderResolver.java      | 150 ++++++
 .../apache/inlong/sort/tests/utils/TestUtils.java  |  16 +
 .../test/resources/env/kafka_test_kafka_init.txt   |   1 +
 .../test/resources/env/kafka_test_mysql_init.txt   |  19 +
 .../src/test/resources/groupFile/kafka_test.json   | 562 +++++++++++++++++++++
 7 files changed, 894 insertions(+), 67 deletions(-)

diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
index 2f9b2128f..2f82b7ac2 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.tests;
 
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
 import org.apache.inlong.sort.tests.utils.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -34,6 +34,8 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -41,9 +43,10 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * End-to-end tests for sort-connector-kafka uber jar.
@@ -56,17 +59,6 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
     private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
     private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
     // Can't use getResource("xxx").getPath(), windows will don't know that path
-    private static final String sqlFile;
-
-    static {
-        try {
-            sqlFile = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/kafka_test.sql").toURI()).toString();
-        } catch (URISyntaxException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static final String TOPIC = "test-topic";
 
     @ClassRule
     public static final KafkaContainer KAFKA =
@@ -76,33 +68,47 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
                     .withEmbeddedZookeeper()
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
-    @Before
-    public void setup() {
-        initializeMysqlTable();
-        initializeKafkaTable();
-    }
-
-    @After
-    public void teardown() {
+    @AfterClass
+    public static void teardown() {
         if (KAFKA != null) {
             KAFKA.stop();
         }
     }
 
-    private void initializeKafkaTable() {
-        List<String> commands = new ArrayList<>();
-        commands.add("kafka-topics");
-        commands.add("--create");
-        commands.add("--topic");
-        commands.add(TOPIC);
-        commands.add("--replication-factor 1");
-        commands.add("--partitions 1");
-        commands.add("--zookeeper");
-        commands.add("localhost:" + KafkaContainer.ZOOKEEPER_PORT);
+    private Path getSql(String fileName, Map<String, Object> properties) {
+        try {
+            Path file = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/" + fileName).toURI());
+            return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Path getGroupFile(String fileName, Map<String, Object> properties) {
         try {
-            LOG.info(String.join(" ", commands));
-            ExecResult result = KAFKA.execInContainer("bash", "-c", String.join(" ", commands));
-            LOG.info(result.getStdout());
+            Path file = Paths.get(KafkaE2ECase.class.getResource("/groupFile/" + fileName).toURI());
+            return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String getCreateStatement(String fileName, Map<String, Object> properties) {
+        try {
+            Path file = Paths.get(KafkaE2ECase.class.getResource("/env/" + fileName).toURI());
+            return PlaceholderResolver.getDefaultResolver().resolveByMap(
+                    new String(Files.readAllBytes(file), StandardCharsets.UTF_8),
+                    properties);
+        } catch (IOException | URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initializeKafkaTable(String fileName, Map<String, Object> properties) {
+        try {
+            String createKafkaStatement = getCreateStatement(fileName, properties);
+            ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement);
+            LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout());
             if (result.getExitCode() != 0) {
                 throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode());
             }
@@ -111,43 +117,40 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
         }
     }
 
-    private void initializeMysqlTable() {
+    private void initializeMysqlTable(String fileName, Map<String, Object> properties) {
         try (Connection conn =
                 DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
                 Statement stat = conn.createStatement()) {
-            stat.execute(
-                    "CREATE TABLE test_input (\n"
-                            + "  id INTEGER NOT NULL,\n"
-                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
-                            + "  description VARCHAR(512),\n"
-                            + "  weight FLOAT,\n"
-                            + "  enum_c enum('red', 'white') default 'red',  -- test some complex types as well,\n"
-                            + "  json_c JSON, -- because we use additional dependencies to deserialize complex types.\n"
-                            + "  point_c POINT\n"
-                            + ");");
-            stat.execute(
-                    "CREATE TABLE test_output (\n"
-                            + "  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
-                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
-                            + "  description VARCHAR(512),\n"
-                            + "  weight FLOAT,\n"
-                            + "  enum_c VARCHAR(255),\n"
-                            + "  json_c VARCHAR(255),\n"
-                            + "  point_c VARCHAR(255)\n"
-                            + ");");
+            String createMysqlStatement = getCreateStatement(fileName, properties);
+            stat.execute(createMysqlStatement);
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
     }
 
-
     /**
      * Test flink sql mysql cdc to hive
      *
      * @throws Exception The exception may throws when execute the case
      */
     @Test
-    public void testKafka() throws Exception {
+    public void testKafkaWithSqlFile() throws Exception {
+        final String topic = "test-topic";
+        final String mysqlInputTable = "test_input";
+        final String mysqlOutputTable = "test_output";
+        initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() {
+            {
+                put("MYSQL_INPUT_TABLE", mysqlInputTable);
+                put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+            }
+        });
+        initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() {
+            {
+                put("TOPIC", topic);
+                put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+            }
+        });
+        String sqlFile = getSql("kafka_test.sql", new HashMap<>()).toString();
         submitSQLJob(sqlFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar);
         waitUntilJobRunning(Duration.ofSeconds(30));
 
@@ -179,4 +182,60 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
                 60000L);
     }
 
+    @Test
+    public void testKafkaWithGroupFile() throws Exception {
+        final String topic = "test_topic_for_group_file";
+        final String mysqlInputTable = "test_input_for_group_file";
+        final String mysqlOutputTable = "test_output_for_group_file";
+        initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() {
+            {
+                put("MYSQL_INPUT_TABLE", mysqlInputTable);
+                put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+            }
+        });
+        initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() {
+            {
+                put("TOPIC", topic);
+                put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+            }
+        });
+        String groupFile = getGroupFile("kafka_test.json", new HashMap() {
+            {
+                put("MYSQL_INPUT_TABLE", mysqlInputTable);
+                put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+                put("TOPIC", topic);
+                put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+            }
+        }).toString();
+        submitGroupFileJob(groupFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+
+        // generate input
+        try (Connection conn =
+                DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "INSERT INTO test_input_for_group_file "
+                            + "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);");
+            stat.execute(
+                    "INSERT INTO test_input_for_group_file "
+                            + "VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        // validate output
+        JdbcProxy proxy =
+                new JdbcProxy(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword(), MYSQL_DRIVER_CLASS);
+        List<String> expectResult =
+                Arrays.asList(
+                        "1,jacket,water resistent white wind breaker,0.2,null,null,null",
+                        "2,scooter,Big 2-wheel scooter ,5.18,null,null,null");
+        proxy.checkResultWithTimeout(
+                expectResult,
+                mysqlOutputTable,
+                7,
+                60000L);
+    }
 }
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index b7eba6c4c..914ac7285 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
@@ -102,10 +102,10 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
     public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
     @Nullable
-    private RestClusterClient<StandaloneClusterId> restClusterClient;
+    private static RestClusterClient<StandaloneClusterId> restClusterClient;
 
-    private GenericContainer<?> jobManager;
-    private GenericContainer<?> taskManager;
+    private static GenericContainer<?> jobManager;
+    private static GenericContainer<?> taskManager;
 
 
     // ----------------------------------------------------------------------------------------
@@ -121,12 +121,13 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
                     .withDatabaseName("test")
                     .withUsername("flinkuser")
                     .withPassword("flinkpw")
+                    .withUrlParam("allowMultiQueries", "true")
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
 
-    @Before
-    public void before() {
+    @BeforeClass
+    public static void before() {
         LOG.info("Starting containers...");
         jobManager =
                 new GenericContainer<>("flink:1.13.5-scala_2.11")
@@ -151,8 +152,8 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
         LOG.info("Containers are started.");
     }
 
-    @After
-    public void after() {
+    @AfterClass
+    public static void after() {
         if (restClusterClient != null) {
             restClusterClient.close();
         }
@@ -191,6 +192,25 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
         }
     }
 
+    public void submitGroupFileJob(String groupFile, Path... jars)
+            throws IOException, InterruptedException {
+        final List<String> commands = new ArrayList<>();
+        String containerGroupFile = copyToContainerTmpPath(jobManager, groupFile);
+        commands.add(FLINK_BIN + "/flink run -d");
+        commands.add("-c org.apache.inlong.sort.Entrance");
+        commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars)));
+        commands.add("--group.info.file");
+        commands.add(containerGroupFile);
+
+        ExecResult execResult =
+                jobManager.execInContainer("bash", "-c", String.join(" ", commands));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        if (execResult.getExitCode() != 0) {
+            throw new AssertionError("Failed when submitting the SQL job.");
+        }
+    }
+
     /**
      * Get {@link RestClusterClient} connected to this FlinkContainer.
      *
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
new file mode 100644
index 000000000..277124fe0
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
@@ -0,0 +1,150 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A file placeholder replacement tool.
+ */
+public class PlaceholderResolver {
+    /**
+     * Default placeholder prefix
+     */
+    public static final String DEFAULT_PLACEHOLDER_PREFIX = "${";
+
+    /**
+     * Default placeholder suffix
+     */
+    public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}";
+
+    /**
+     * Default singleton resolver
+     */
+    private static PlaceholderResolver defaultResolver = new PlaceholderResolver();
+
+    /**
+     * Placeholder prefix
+     */
+    private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX;
+
+    /**
+     * Placeholder suffix
+     */
+    private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX;
+
+    private PlaceholderResolver() {
+
+    }
+
+    private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) {
+        this.placeholderPrefix = placeholderPrefix;
+        this.placeholderSuffix = placeholderSuffix;
+    }
+
+    public static PlaceholderResolver getDefaultResolver() {
+        return defaultResolver;
+    }
+
+    public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) {
+        return new PlaceholderResolver(placeholderPrefix, placeholderSuffix);
+    }
+
+    /**
+     * Replace template string with special placeholder according to replace function.
+     * @param content  template string with special placeholder
+     * @param rule  placeholder replacement rule
+     * @return new replaced string
+     */
+    public String resolveByRule(String content, Function<String, String> rule) {
+        int start = content.indexOf(this.placeholderPrefix);
+        if (start == -1) {
+            return content;
+        }
+        StringBuilder result = new StringBuilder(content);
+        while (start != -1) {
+            int end = result.indexOf(this.placeholderSuffix, start);
+            // get placeholder actual value (e.g. ${id}, get the value represent id)
+            String placeholder = result.substring(start + this.placeholderPrefix.length(), end);
+            // replace placeholder value
+            String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder);
+            result.replace(start, end + this.placeholderSuffix.length(), replaceContent);
+            start = result.indexOf(this.placeholderPrefix, start + replaceContent.length());
+        }
+        return result.toString();
+    }
+
+    /**
+     * Replace template string with special placeholder according to replace function.
+     * @param file  template file with special placeholder
+     * @param rule  placeholder replacement rule
+     * @return new replaced string
+     */
+    public Path resolveByRule(Path file, Function<String, String> rule) {
+        try {
+            List<String> newContents = Files.readAllLines(file, StandardCharsets.UTF_8)
+                    .stream()
+                    .map(content -> resolveByRule(content, rule))
+                    .collect(Collectors.toList());
+            Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$");
+            Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8));
+            return newPath;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Replace template string with special placeholder according to properties file.
+     * Key is the content of the placeholder <br/><br/>
+     * e.g: content = product:${id}:detail:${did}<br/>
+     *      valueMap = id -> 1; pid -> 2<br/>
+     *      return: product:1:detail:2<br/>
+     *
+     * @param content template string with special placeholder
+     * @param valueMap placeholder replacement map
+     * @return new replaced string
+     */
+    public String resolveByMap(String content, final Map<String, Object> valueMap) {
+        return resolveByRule(content, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
+    }
+
+    /**
+     * Replace template string with special placeholder according to properties file.
+     * Key is the content of the placeholder <br/><br/>
+     * e.g: content = product:${id}:detail:${did}<br/>
+     *      valueMap = id -> 1; pid -> 2<br/>
+     *      return: product:1:detail:2<br/>
+     *
+     * @param file template string with special placeholder
+     * @param valueMap placeholder replacement map
+     * @return new replaced string
+     */
+    public Path resolveByMap(Path file, final Map<String, Object> valueMap) {
+        return resolveByRule(file, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
+    }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
index beeb5edd9..a73d7afec 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
@@ -18,17 +18,23 @@
 
 package org.apache.inlong.sort.tests.utils;
 
+import org.junit.Test;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test util for test container.
  */
@@ -106,4 +112,14 @@ public class TestUtils {
             return value == null ? defaultValue : converter.apply(value);
         }
     }
+
+    @Test
+    public void testReplaceholder() {
+        String before = "today is ${date}, today weather is ${weather}";
+        Map<String, Object> maps = new HashMap<>();
+        maps.put("date", "2022.08.05");
+        maps.put("weather", "rain");
+        String after = PlaceholderResolver.getDefaultResolver().resolveByMap(before, maps);
+        assertEquals(after, "today is 2022.08.05, today weather is rain");
+    }
 }
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 000000000..b2f31d78f
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt
new file mode 100644
index 000000000..c7b1948a1
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt
@@ -0,0 +1,19 @@
+CREATE TABLE ${MYSQL_INPUT_TABLE} (
+    id INTEGER NOT NULL,
+    name VARCHAR(255) NOT NULL DEFAULT 'flink',
+    description VARCHAR(512),
+    weight FLOAT,
+    enum_c enum('red', 'white') default 'red',  -- test some complex types as well,
+    json_c JSON, -- because we use additional dependencies to deserialize complex types.
+    point_c POINT
+);
+
+CREATE TABLE ${MYSQL_OUTPUT_TABLE} (
+    id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+    name VARCHAR(255) NOT NULL DEFAULT 'flink',
+    description VARCHAR(512),
+    weight FLOAT,
+    enum_c VARCHAR(255),
+    json_c VARCHAR(255),
+    point_c VARCHAR(255)
+);
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json
new file mode 100644
index 000000000..54931a186
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json
@@ -0,0 +1,562 @@
+{
+  "groupId": "t323a2b0f-614e-4b04-82d4-3880427bc3e8",
+  "streams": [
+    {
+      "streamId": "1",
+      "nodes": [
+        {
+          "type": "mysqlExtract",
+          "id": "1",
+          "name": "mysql_input",
+          "fields": [
+            {
+              "type": "field",
+              "name": "id",
+              "formatInfo": {
+                "type": "int"
+              }
+            },
+            {
+              "type": "field",
+              "name": "name",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "description",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "weight",
+              "formatInfo": {
+                "type": "decimal",
+                "precision": 10,
+                "scale": 3
+              }
+            },
+            {
+              "type": "field",
+              "name": "enum_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "json_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "point_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            }
+          ],
+          "primaryKey": "id",
+          "tableNames": [
+            "${MYSQL_INPUT_TABLE}"
+          ],
+          "hostname": "mysql",
+          "username": "inlong",
+          "password": "inlong",
+          "database": "test",
+          "port": 3306,
+          "incrementalSnapshotEnabled": false,
+          "serverTimeZone": "Asia/Shanghai"
+        },
+        {
+          "type": "kafkaLoad",
+          "id": "2",
+          "name": "kafka_load",
+          "fields": [
+            {
+              "type": "field",
+              "name": "id",
+              "formatInfo": {
+                "type": "int"
+              }
+            },
+            {
+              "type": "field",
+              "name": "name",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "description",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "weight",
+              "formatInfo": {
+                "type": "decimal",
+                "precision": 10,
+                "scale": 3
+              }
+            },
+            {
+              "type": "field",
+              "name": "enum_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "json_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "point_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            }
+          ],
+          "fieldRelations": [
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "id",
+                "formatInfo": {
+                  "type": "int"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "id",
+                "formatInfo": {
+                  "type": "int"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "name",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "name",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "description",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "description",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "weight",
+                "formatInfo": {
+                  "type": "decimal",
+                  "precision": 10,
+                  "scale": 3
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "weight",
+                "formatInfo": {
+                  "type": "decimal",
+                  "precision": 10,
+                  "scale": 3
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "enum_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "enum_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "json_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "json_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "point_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "point_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            }
+          ],
+          "primaryKey": "id",
+          "topic": "${TOPIC}",
+          "bootstrapServers": "kafka:9092",
+          "format": {
+            "type": "canalJsonFormat",
+            "ignoreParseErrors": true,
+            "timestampFormatStandard": "SQL",
+            "mapNullKeyMode": "DROP",
+            "mapNullKeyLiteral": "null",
+            "encodeDecimalAsPlainNumber": true
+          },
+          "sinkParallelism": 1,
+          "properties": {}
+        }
+      ],
+      "relations": [
+        {
+          "type": "baseRelation",
+          "inputs": [
+            "1"
+          ],
+          "outputs": [
+            "2"
+          ]
+        }
+      ]
+    },
+    {
+      "streamId": "2",
+      "nodes": [
+        {
+          "type": "kafkaExtract",
+          "id": "3",
+          "name": "kafka_extract",
+          "fields": [
+            {
+              "type": "field",
+              "name": "id",
+              "formatInfo": {
+                "type": "int"
+              }
+            },
+            {
+              "type": "field",
+              "name": "name",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "description",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "weight",
+              "formatInfo": {
+                "type": "decimal",
+                "precision": 10,
+                "scale": 3
+              }
+            },
+            {
+              "type": "field",
+              "name": "enum_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "json_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "point_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            }
+          ],
+          "primaryKey": "id",
+          "topic": "${TOPIC}",
+          "bootstrapServers": "kafka:9092",
+          "format": {
+            "type": "canalJsonFormat",
+            "ignoreParseErrors": true,
+            "timestampFormatStandard": "SQL",
+            "mapNullKeyMode": "DROP",
+            "mapNullKeyLiteral": "null",
+            "encodeDecimalAsPlainNumber": true
+          },
+          "scanStartupMode": "EARLIEST_OFFSET",
+          "groupId": null,
+          "scanSpecificOffsets": null
+        },
+        {
+          "type": "mysqlLoad",
+          "id": "4",
+          "name": "mysql_output",
+          "fields": [
+            {
+              "type": "field",
+              "name": "id",
+              "formatInfo": {
+                "type": "int"
+              }
+            },
+            {
+              "type": "field",
+              "name": "name",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "description",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "weight",
+              "formatInfo": {
+                "type": "decimal",
+                "precision": 10,
+                "scale": 3
+              }
+            },
+            {
+              "type": "field",
+              "name": "enum_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "json_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            },
+            {
+              "type": "field",
+              "name": "point_c",
+              "formatInfo": {
+                "type": "string"
+              }
+            }
+          ],
+          "fieldRelations": [
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "id",
+                "formatInfo": {
+                  "type": "int"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "id",
+                "formatInfo": {
+                  "type": "int"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "name",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "name",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "description",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "description",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "weight",
+                "formatInfo": {
+                  "type": "decimal",
+                  "precision": 10,
+                  "scale": 3
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "weight",
+                "formatInfo": {
+                  "type": "decimal",
+                  "precision": 10,
+                  "scale": 3
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "enum_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "enum_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "json_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "json_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            },
+            {
+              "type": "fieldRelation",
+              "inputField": {
+                "type": "field",
+                "name": "point_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              },
+              "outputField": {
+                "type": "field",
+                "name": "point_c",
+                "formatInfo": {
+                  "type": "string"
+                }
+              }
+            }
+          ],
+          "url": "jdbc:mysql://mysql:3306/test",
+          "username": "inlong",
+          "password": "inlong",
+          "tableName": "${MYSQL_OUTPUT_TABLE}",
+          "primaryKey": "id"
+        }
+      ],
+      "relations": [
+        {
+          "type": "baseRelation",
+          "inputs": [
+            "3"
+          ],
+          "outputs": [
+            "4"
+          ]
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file