You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/23 06:19:51 UTC
[inlong] branch master updated: [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99189beda [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
99189beda is described below
commit 99189bedacca354e5244be251235a32ad8ccf1e2
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:19:44 2022 +0800
[INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
Co-authored-by: thesumery <15...@qq.com>
---
.../org/apache/inlong/sort/tests/KafkaE2ECase.java | 175 ++++---
.../sort/tests/utils/FlinkContainerTestEnv.java | 38 +-
.../sort/tests/utils/PlaceholderResolver.java | 150 ++++++
.../apache/inlong/sort/tests/utils/TestUtils.java | 16 +
.../test/resources/env/kafka_test_kafka_init.txt | 1 +
.../test/resources/env/kafka_test_mysql_init.txt | 19 +
.../src/test/resources/groupFile/kafka_test.json | 562 +++++++++++++++++++++
7 files changed, 894 insertions(+), 67 deletions(-)
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
index 2f9b2128f..2f82b7ac2 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
import org.apache.inlong.sort.tests.utils.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
@@ -34,6 +34,8 @@ import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -41,9 +43,10 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* End-to-end tests for sort-connector-kafka uber jar.
@@ -56,17 +59,6 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
// Can't use getResource("xxx").getPath(), windows will don't know that path
- private static final String sqlFile;
-
- static {
- try {
- sqlFile = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/kafka_test.sql").toURI()).toString();
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final String TOPIC = "test-topic";
@ClassRule
public static final KafkaContainer KAFKA =
@@ -76,33 +68,47 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
.withEmbeddedZookeeper()
.withLogConsumer(new Slf4jLogConsumer(LOG));
- @Before
- public void setup() {
- initializeMysqlTable();
- initializeKafkaTable();
- }
-
- @After
- public void teardown() {
+ @AfterClass
+ public static void teardown() {
if (KAFKA != null) {
KAFKA.stop();
}
}
- private void initializeKafkaTable() {
- List<String> commands = new ArrayList<>();
- commands.add("kafka-topics");
- commands.add("--create");
- commands.add("--topic");
- commands.add(TOPIC);
- commands.add("--replication-factor 1");
- commands.add("--partitions 1");
- commands.add("--zookeeper");
- commands.add("localhost:" + KafkaContainer.ZOOKEEPER_PORT);
+ private Path getSql(String fileName, Map<String, Object> properties) {
+ try {
+ Path file = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/" + fileName).toURI());
+ return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Path getGroupFile(String fileName, Map<String, Object> properties) {
try {
- LOG.info(String.join(" ", commands));
- ExecResult result = KAFKA.execInContainer("bash", "-c", String.join(" ", commands));
- LOG.info(result.getStdout());
+ Path file = Paths.get(KafkaE2ECase.class.getResource("/groupFile/" + fileName).toURI());
+ return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getCreateStatement(String fileName, Map<String, Object> properties) {
+ try {
+ Path file = Paths.get(KafkaE2ECase.class.getResource("/env/" + fileName).toURI());
+ return PlaceholderResolver.getDefaultResolver().resolveByMap(
+ new String(Files.readAllBytes(file), StandardCharsets.UTF_8),
+ properties);
+ } catch (IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initializeKafkaTable(String fileName, Map<String, Object> properties) {
+ try {
+ String createKafkaStatement = getCreateStatement(fileName, properties);
+ ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement);
+ LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout());
if (result.getExitCode() != 0) {
throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode());
}
@@ -111,43 +117,40 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
}
}
- private void initializeMysqlTable() {
+ private void initializeMysqlTable(String fileName, Map<String, Object> properties) {
try (Connection conn =
DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
Statement stat = conn.createStatement()) {
- stat.execute(
- "CREATE TABLE test_input (\n"
- + " id INTEGER NOT NULL,\n"
- + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
- + " description VARCHAR(512),\n"
- + " weight FLOAT,\n"
- + " enum_c enum('red', 'white') default 'red', -- test some complex types as well,\n"
- + " json_c JSON, -- because we use additional dependencies to deserialize complex types.\n"
- + " point_c POINT\n"
- + ");");
- stat.execute(
- "CREATE TABLE test_output (\n"
- + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
- + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
- + " description VARCHAR(512),\n"
- + " weight FLOAT,\n"
- + " enum_c VARCHAR(255),\n"
- + " json_c VARCHAR(255),\n"
- + " point_c VARCHAR(255)\n"
- + ");");
+ String createMysqlStatement = getCreateStatement(fileName, properties);
+ stat.execute(createMysqlStatement);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
-
/**
* Test flink sql mysql cdc to hive
*
* @throws Exception The exception may throws when execute the case
*/
@Test
- public void testKafka() throws Exception {
+ public void testKafkaWithSqlFile() throws Exception {
+ final String topic = "test-topic";
+ final String mysqlInputTable = "test_input";
+ final String mysqlOutputTable = "test_output";
+ initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() {
+ {
+ put("MYSQL_INPUT_TABLE", mysqlInputTable);
+ put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+ }
+ });
+ initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() {
+ {
+ put("TOPIC", topic);
+ put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+ }
+ });
+ String sqlFile = getSql("kafka_test.sql", new HashMap<>()).toString();
submitSQLJob(sqlFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(30));
@@ -179,4 +182,60 @@ public class KafkaE2ECase extends FlinkContainerTestEnv {
60000L);
}
+ @Test
+ public void testKafkaWithGroupFile() throws Exception {
+ final String topic = "test_topic_for_group_file";
+ final String mysqlInputTable = "test_input_for_group_file";
+ final String mysqlOutputTable = "test_output_for_group_file";
+ initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() {
+ {
+ put("MYSQL_INPUT_TABLE", mysqlInputTable);
+ put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+ }
+ });
+ initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() {
+ {
+ put("TOPIC", topic);
+ put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+ }
+ });
+ String groupFile = getGroupFile("kafka_test.json", new HashMap() {
+ {
+ put("MYSQL_INPUT_TABLE", mysqlInputTable);
+ put("MYSQL_OUTPUT_TABLE", mysqlOutputTable);
+ put("TOPIC", topic);
+ put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT);
+ }
+ }).toString();
+ submitGroupFileJob(groupFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ // generate input
+ try (Connection conn =
+ DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword());
+ Statement stat = conn.createStatement()) {
+ stat.execute(
+ "INSERT INTO test_input_for_group_file "
+ + "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);");
+ stat.execute(
+ "INSERT INTO test_input_for_group_file "
+ + "VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ // validate output
+ JdbcProxy proxy =
+ new JdbcProxy(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword(), MYSQL_DRIVER_CLASS);
+ List<String> expectResult =
+ Arrays.asList(
+ "1,jacket,water resistent white wind breaker,0.2,null,null,null",
+ "2,scooter,Big 2-wheel scooter ,5.18,null,null,null");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ mysqlOutputTable,
+ 7,
+ 60000L);
+ }
}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index b7eba6c4c..914ac7285 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -102,10 +102,10 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Nullable
- private RestClusterClient<StandaloneClusterId> restClusterClient;
+ private static RestClusterClient<StandaloneClusterId> restClusterClient;
- private GenericContainer<?> jobManager;
- private GenericContainer<?> taskManager;
+ private static GenericContainer<?> jobManager;
+ private static GenericContainer<?> taskManager;
// ----------------------------------------------------------------------------------------
@@ -121,12 +121,13 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
.withDatabaseName("test")
.withUsername("flinkuser")
.withPassword("flinkpw")
+ .withUrlParam("allowMultiQueries", "true")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
- @Before
- public void before() {
+ @BeforeClass
+ public static void before() {
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>("flink:1.13.5-scala_2.11")
@@ -151,8 +152,8 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
LOG.info("Containers are started.");
}
- @After
- public void after() {
+ @AfterClass
+ public static void after() {
if (restClusterClient != null) {
restClusterClient.close();
}
@@ -191,6 +192,25 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
}
}
+ public void submitGroupFileJob(String groupFile, Path... jars)
+ throws IOException, InterruptedException {
+ final List<String> commands = new ArrayList<>();
+ String containerGroupFile = copyToContainerTmpPath(jobManager, groupFile);
+ commands.add(FLINK_BIN + "/flink run -d");
+ commands.add("-c org.apache.inlong.sort.Entrance");
+ commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars)));
+ commands.add("--group.info.file");
+ commands.add(containerGroupFile);
+
+ ExecResult execResult =
+ jobManager.execInContainer("bash", "-c", String.join(" ", commands));
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ if (execResult.getExitCode() != 0) {
+ throw new AssertionError("Failed when submitting the SQL job.");
+ }
+ }
+
/**
* Get {@link RestClusterClient} connected to this FlinkContainer.
*
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
new file mode 100644
index 000000000..277124fe0
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A file placeholder replacement tool.
+ */
+public class PlaceholderResolver {
+ /**
+ * Default placeholder prefix
+ */
+ public static final String DEFAULT_PLACEHOLDER_PREFIX = "${";
+
+ /**
+ * Default placeholder suffix
+ */
+ public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}";
+
+ /**
+ * Default singleton resolver
+ */
+ private static PlaceholderResolver defaultResolver = new PlaceholderResolver();
+
+ /**
+ * Placeholder prefix
+ */
+ private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX;
+
+ /**
+ * Placeholder suffix
+ */
+ private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX;
+
+ private PlaceholderResolver() {
+
+ }
+
+ private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) {
+ this.placeholderPrefix = placeholderPrefix;
+ this.placeholderSuffix = placeholderSuffix;
+ }
+
+ public static PlaceholderResolver getDefaultResolver() {
+ return defaultResolver;
+ }
+
+ public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) {
+ return new PlaceholderResolver(placeholderPrefix, placeholderSuffix);
+ }
+
+ /**
+ * Replace template string with special placeholder according to replace function.
+ * @param content template string with special placeholder
+ * @param rule placeholder replacement rule
+ * @return new replaced string
+ */
+ public String resolveByRule(String content, Function<String, String> rule) {
+ int start = content.indexOf(this.placeholderPrefix);
+ if (start == -1) {
+ return content;
+ }
+ StringBuilder result = new StringBuilder(content);
+ while (start != -1) {
+ int end = result.indexOf(this.placeholderSuffix, start);
+ // get placeholder actual value (e.g. ${id}, get the value represent id)
+ String placeholder = result.substring(start + this.placeholderPrefix.length(), end);
+ // replace placeholder value
+ String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder);
+ result.replace(start, end + this.placeholderSuffix.length(), replaceContent);
+ start = result.indexOf(this.placeholderPrefix, start + replaceContent.length());
+ }
+ return result.toString();
+ }
+
+ /**
+ * Replace template string with special placeholder according to replace function.
+ * @param file template file with special placeholder
+ * @param rule placeholder replacement rule
+ * @return new replaced string
+ */
+ public Path resolveByRule(Path file, Function<String, String> rule) {
+ try {
+ List<String> newContents = Files.readAllLines(file, StandardCharsets.UTF_8)
+ .stream()
+ .map(content -> resolveByRule(content, rule))
+ .collect(Collectors.toList());
+ Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$");
+ Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8));
+ return newPath;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Replace template string with special placeholder according to properties file.
+ * Key is the content of the placeholder <br/><br/>
+ * e.g: content = product:${id}:detail:${did}<br/>
+ * valueMap = id -> 1; pid -> 2<br/>
+ * return: product:1:detail:2<br/>
+ *
+ * @param content template string with special placeholder
+ * @param valueMap placeholder replacement map
+ * @return new replaced string
+ */
+ public String resolveByMap(String content, final Map<String, Object> valueMap) {
+ return resolveByRule(content, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
+ }
+
+ /**
+ * Replace template string with special placeholder according to properties file.
+ * Key is the content of the placeholder <br/><br/>
+ * e.g: content = product:${id}:detail:${did}<br/>
+ * valueMap = id -> 1; pid -> 2<br/>
+ * return: product:1:detail:2<br/>
+ *
+ * @param file template string with special placeholder
+ * @param valueMap placeholder replacement map
+ * @return new replaced string
+ */
+ public Path resolveByMap(Path file, final Map<String, Object> valueMap) {
+ return resolveByRule(file, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
+ }
+}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
index beeb5edd9..a73d7afec 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java
@@ -18,17 +18,23 @@
package org.apache.inlong.sort.tests.utils;
+import org.junit.Test;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.junit.Assert.assertEquals;
+
/**
* Test util for test container.
*/
@@ -106,4 +112,14 @@ public class TestUtils {
return value == null ? defaultValue : converter.apply(value);
}
}
+
+ @Test
+ public void testReplaceholder() {
+ String before = "today is ${date}, today weather is ${weather}";
+ Map<String, Object> maps = new HashMap<>();
+ maps.put("date", "2022.08.05");
+ maps.put("weather", "rain");
+ String after = PlaceholderResolver.getDefaultResolver().resolveByMap(before, maps);
+ assertEquals(after, "today is 2022.08.05, today weather is rain");
+ }
}
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 000000000..b2f31d78f
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt
new file mode 100644
index 000000000..c7b1948a1
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt
@@ -0,0 +1,19 @@
+CREATE TABLE ${MYSQL_INPUT_TABLE} (
+ id INTEGER NOT NULL,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c enum('red', 'white') default 'red', -- test some complex types as well,
+ json_c JSON, -- because we use additional dependencies to deserialize complex types.
+ point_c POINT
+);
+
+CREATE TABLE ${MYSQL_OUTPUT_TABLE} (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c VARCHAR(255),
+ json_c VARCHAR(255),
+ point_c VARCHAR(255)
+);
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json
new file mode 100644
index 000000000..54931a186
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json
@@ -0,0 +1,562 @@
+{
+ "groupId": "t323a2b0f-614e-4b04-82d4-3880427bc3e8",
+ "streams": [
+ {
+ "streamId": "1",
+ "nodes": [
+ {
+ "type": "mysqlExtract",
+ "id": "1",
+ "name": "mysql_input",
+ "fields": [
+ {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "primaryKey": "id",
+ "tableNames": [
+ "${MYSQL_INPUT_TABLE}"
+ ],
+ "hostname": "mysql",
+ "username": "inlong",
+ "password": "inlong",
+ "database": "test",
+ "port": 3306,
+ "incrementalSnapshotEnabled": false,
+ "serverTimeZone": "Asia/Shanghai"
+ },
+ {
+ "type": "kafkaLoad",
+ "id": "2",
+ "name": "kafka_load",
+ "fields": [
+ {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "fieldRelations": [
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ }
+ ],
+ "primaryKey": "id",
+ "topic": "${TOPIC}",
+ "bootstrapServers": "kafka:9092",
+ "format": {
+ "type": "canalJsonFormat",
+ "ignoreParseErrors": true,
+ "timestampFormatStandard": "SQL",
+ "mapNullKeyMode": "DROP",
+ "mapNullKeyLiteral": "null",
+ "encodeDecimalAsPlainNumber": true
+ },
+ "sinkParallelism": 1,
+ "properties": {}
+ }
+ ],
+ "relations": [
+ {
+ "type": "baseRelation",
+ "inputs": [
+ "1"
+ ],
+ "outputs": [
+ "2"
+ ]
+ }
+ ]
+ },
+ {
+ "streamId": "2",
+ "nodes": [
+ {
+ "type": "kafkaExtract",
+ "id": "3",
+ "name": "kafka_extract",
+ "fields": [
+ {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "primaryKey": "id",
+ "topic": "${TOPIC}",
+ "bootstrapServers": "kafka:9092",
+ "format": {
+ "type": "canalJsonFormat",
+ "ignoreParseErrors": true,
+ "timestampFormatStandard": "SQL",
+ "mapNullKeyMode": "DROP",
+ "mapNullKeyLiteral": "null",
+ "encodeDecimalAsPlainNumber": true
+ },
+ "scanStartupMode": "EARLIEST_OFFSET",
+ "groupId": null,
+ "scanSpecificOffsets": null
+ },
+ {
+ "type": "mysqlLoad",
+ "id": "4",
+ "name": "mysql_output",
+ "fields": [
+ {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "fieldRelations": [
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "id",
+ "formatInfo": {
+ "type": "int"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "description",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "weight",
+ "formatInfo": {
+ "type": "decimal",
+ "precision": 10,
+ "scale": 3
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "enum_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "json_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "fieldRelation",
+ "inputField": {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ "outputField": {
+ "type": "field",
+ "name": "point_c",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ }
+ ],
+ "url": "jdbc:mysql://mysql:3306/test",
+ "username": "inlong",
+ "password": "inlong",
+ "tableName": "${MYSQL_OUTPUT_TABLE}",
+ "primaryKey": "id"
+ }
+ ],
+ "relations": [
+ {
+ "type": "baseRelation",
+ "inputs": [
+ "3"
+ ],
+ "outputs": [
+ "4"
+ ]
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file