You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/20 08:23:27 UTC

[incubator-inlong] branch master updated: [InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f40ecfdb [InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)
8f40ecfdb is described below

commit 8f40ecfdb539b335208becd592e66725259525d1
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri May 20 16:23:20 2022 +0800

    [InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)
---
 .../sort/protocol/constant/PostgresConstant.java   |   2 +-
 .../apache/inlong/sort/protocol/node/LoadNode.java |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   4 +-
 .../protocol/node/load/ClickHouseLoadNode.java     | 116 +++++++
 .../protocol/node/load/ClickHouseLoadNodeTest.java |  50 +++
 inlong-sort/sort-connectors/pom.xml                |  19 +
 .../flink/clickhouse/table/ClickHouseDialect.java  | 122 +++++++
 .../clickhouse/table/ClickHouseRowConverter.java   |  40 +++
 .../sort/flink/jdbc/table/AbstractJdbcDialect.java | 103 ++++++
 .../inlong/sort/flink/jdbc/table/JdbcDialects.java |  60 ++++
 .../flink/jdbc/table/JdbcDynamicTableFactory.java  | 384 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 inlong-sort/sort-single-tenant/pom.xml             |  20 --
 .../flink/parser/ClickHouseSqlParserTest.java      | 126 +++++++
 14 files changed, 1028 insertions(+), 23 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
index b9a15a466..e43907bf8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
@@ -49,6 +49,6 @@ public class PostgresConstant {
 
     public static final String URL = "url";
 
-    public static final String JDBC = "jdbc";
+    public static final String JDBC = "jdbc-inlong";
 
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index fd54d4589..60a895d3e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -28,6 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
@@ -52,7 +53,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
         @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
         @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad"),
-        @JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad")
+        @JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad"),
+        @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad")
 })
 @NoArgsConstructor
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 57f9f4522..ef2f172fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -27,6 +27,7 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -60,7 +61,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
         @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
         @JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad"),
-        @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad")
+        @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad"),
+        @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad")
 })
 public interface Node {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
new file mode 100644
index 000000000..8bad0b2e0
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("clickHouseLoad")
+@Data
+@NoArgsConstructor
+public class ClickHouseLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    @JsonProperty("tableName")
+    @Nonnull
+    private String tableName;
+
+    @JsonProperty("url")
+    @Nonnull
+    private String url;
+
+    @JsonProperty("userName")
+    @Nonnull
+    private String userName;
+
+    @JsonProperty("passWord")
+    @Nonnull
+    private String password;
+
+    @JsonProperty("parFields")
+    List<FieldInfo> partitionFields;
+
+    @JsonCreator
+    public ClickHouseLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("tableName") String tableName,
+            @Nonnull @JsonProperty("url") String url,
+            @Nonnull @JsonProperty("userName") String userName,
+            @Nonnull @JsonProperty("passWord") String password,
+            @JsonProperty("parFields") List<FieldInfo> partitionFields) {
+        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
+        this.url = Preconditions.checkNotNull(url, "url is null");
+        this.userName = Preconditions.checkNotNull(userName, "userName is null");
+        this.password = Preconditions.checkNotNull(password, "password is null");
+        this.partitionFields = partitionFields;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        options.put("connector", "jdbc-inlong");
+        options.put("dialect-impl", "org.apache.inlong.sort.flink.clickhouse.table.ClickHouseDialect");
+        options.put("url", url);
+        options.put("table-name", tableName);
+        options.put("username", userName);
+        options.put("password", password);
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return tableName;
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return super.getPrimaryKey();
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return partitionFields;
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
new file mode 100644
index 000000000..4d6056edf
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Arrays;
+
+/**
+ * Test for {@link ClickHouseLoadNode} Serialization/Deserialization.
+ */
+public class ClickHouseLoadNodeTest extends SerializeBaseTest<Node> {
+    @Override
+    public Node getTestObject() {
+
+        return new ClickHouseLoadNode("2", "test_clickhouse",
+                Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
+                Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+                        new FieldInfo("id", new StringFormatInfo()))),
+                null,
+                null,
+                1,
+                null,
+                "ck_demo",
+                "jdbc:clickhouse://localhost:8023/default",
+                "root",
+                "root",
+                null);
+    }
+}
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index ba0d80efc..502d1508c 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -124,6 +124,25 @@
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-all</artifactId>
         </dependency>
+        <!--for hbase-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+        </dependency>
+        <!--for postgresql-->
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-postgres-cdc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
+        <!--for jdbc-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>
+        </dependency>
 
         <!--format dependency-->
         <dependency>
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java
new file mode 100644
index 000000000..950e8e09c
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java
@@ -0,0 +1,122 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.flink.jdbc.table.AbstractJdbcDialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class ClickHouseDialect extends AbstractJdbcDialect {
+
+    // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+    // https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
+    private static final int MAX_TIMESTAMP_PRECISION = 8;
+    private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+    // Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+    // https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
+    private static final int MAX_DECIMAL_PRECISION = 128;
+    private static final int MIN_DECIMAL_PRECISION = 32;
+
+    @Override
+    public String dialectName() {
+        return "ClickHouse";
+    }
+
+    @Override
+    public boolean canHandle(String url) {
+        return url.startsWith("jdbc:clickhouse:");
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new ClickHouseRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "LIMIT " + limit;
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    @Override
+    public int maxDecimalPrecision() {
+        return MAX_DECIMAL_PRECISION;
+    }
+
+    @Override
+    public int minDecimalPrecision() {
+        return MIN_DECIMAL_PRECISION;
+
+    }
+
+    @Override
+    public int maxTimestampPrecision() {
+        return MAX_TIMESTAMP_PRECISION;
+    }
+
+    @Override
+    public int minTimestampPrecision() {
+        return MIN_TIMESTAMP_PRECISION;
+    }
+
+    /**
+     * Defines the unsupported types for the dialect.
+     *
+     * @return a list of logical type roots.
+     */
+    public List<LogicalTypeRoot> unsupportedTypes() {
+        // ClickHouse support data type in
+        // https://clickhouse.com/docs/en/sql-reference/data-types/
+        return Arrays.asList(
+                LogicalTypeRoot.BINARY,
+                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+                LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+                LogicalTypeRoot.INTERVAL_DAY_TIME,
+                LogicalTypeRoot.ARRAY,
+                LogicalTypeRoot.MULTISET,
+                LogicalTypeRoot.MAP,
+                LogicalTypeRoot.ROW,
+                LogicalTypeRoot.DISTINCT_TYPE,
+                LogicalTypeRoot.STRUCTURED_TYPE,
+                LogicalTypeRoot.NULL,
+                LogicalTypeRoot.RAW,
+                LogicalTypeRoot.SYMBOL,
+                LogicalTypeRoot.UNRESOLVED);
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java
new file mode 100644
index 000000000..70cad5423
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * Derby.
+ */
+public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String converterName() {
+        return "ClickHouse";
+    }
+
+    public ClickHouseRowConverter(RowType rowType) {
+        super(rowType);
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java
new file mode 100644
index 000000000..27929ecd6
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java
@@ -0,0 +1,103 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+
+import java.util.List;
+
+/**
+ * Default JDBC dialects implements for validate.
+ */
+public abstract class AbstractJdbcDialect implements JdbcDialect {
+
+    @Override
+    public void validate(TableSchema schema) throws ValidationException {
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            DataType dt = schema.getFieldDataType(i).get();
+            String fieldName = schema.getFieldName(i).get();
+
+            // TODO: We can't convert VARBINARY(n) data type to
+            //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+            // LegacyTypeInfoDataTypeConverter
+            //  when n is smaller than Integer.MAX_VALUE
+            if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
+                    || (dt.getLogicalType() instanceof VarBinaryType
+                    && Integer.MAX_VALUE
+                    != ((VarBinaryType) dt.getLogicalType()).getLength())) {
+                throw new ValidationException(
+                        String.format(
+                                "The %s dialect doesn't support type: %s.",
+                                dialectName(), dt.toString()));
+            }
+
+            // only validate precision of DECIMAL type for blink planner
+            if (dt.getLogicalType() instanceof DecimalType) {
+                int precision = ((DecimalType) dt.getLogicalType()).getPrecision();
+                if (precision > maxDecimalPrecision() || precision < minDecimalPrecision()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "The precision of field '%s' is out of the DECIMAL "
+                                            + "precision range [%d, %d] supported by %s dialect.",
+                                    fieldName,
+                                    minDecimalPrecision(),
+                                    maxDecimalPrecision(),
+                                    dialectName()));
+                }
+            }
+
+            // only validate precision of DECIMAL type for blink planner
+            if (dt.getLogicalType() instanceof TimestampType) {
+                int precision = ((TimestampType) dt.getLogicalType()).getPrecision();
+                if (precision > maxTimestampPrecision() || precision < minTimestampPrecision()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "The precision of field '%s' is out of the TIMESTAMP "
+                                            + "precision range [%d, %d] supported by %s dialect.",
+                                    fieldName,
+                                    minTimestampPrecision(),
+                                    maxTimestampPrecision(),
+                                    dialectName()));
+                }
+            }
+        }
+    }
+
+    public abstract int maxDecimalPrecision();
+
+    public abstract int minDecimalPrecision();
+
+    public abstract int maxTimestampPrecision();
+
+    public abstract int minTimestampPrecision();
+
+    /**
+     * Defines the unsupported types for the dialect.
+     *
+     * @return a list of logical type roots.
+     */
+    public abstract List<LogicalTypeRoot> unsupportedTypes();
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java
new file mode 100644
index 000000000..326798211
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java
@@ -0,0 +1,60 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Default JDBC dialects.
+ */
+public final class JdbcDialects {
+
+    private static final List<JdbcDialect> DIALECTS = new ArrayList<>();
+
+    static {
+        DIALECTS.add(new MySQLDialect());
+        DIALECTS.add(new PostgresDialect());
+    }
+
+    /** Fetch the JdbcDialect class corresponding to a given database url. */
+    public static Optional<JdbcDialect> get(String url) {
+        for (JdbcDialect dialect : DIALECTS) {
+            if (dialect.canHandle(url)) {
+                return Optional.of(dialect);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /** Fetch the JdbcDialect class corresponding to a given database url. */
+    public static void register(String dialectImpl) {
+        try {
+            JdbcDialect dialect = (JdbcDialect) Class.forName(dialectImpl).newInstance();
+            DIALECTS.add(dialect);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new IllegalArgumentException("Cannot register such dialect impl: " + dialectImpl, e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java
new file mode 100644
index 000000000..896f186d2
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java
@@ -0,0 +1,384 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
+ * JdbcDynamicTableSink}.We modify it to strengthen capacity of registering other dialect.
+ */
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "jdbc-inlong";
+    public static final ConfigOption<String> DIALECT_IMPL =
+            ConfigOptions.key("dialect-impl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC Custom Dialect.");
+    public static final ConfigOption<String> URL =
+            ConfigOptions.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC database URL.");
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC table name.");
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC user name.");
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password.");
+    private static final ConfigOption<String> DRIVER =
+            ConfigOptions.key("driver")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The class name of the JDBC driver to use to connect to this URL. "
+                                    + "If not set, it will automatically be derived from the URL.");
+    public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
+            ConfigOptions.key("connection.max-retry-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription("Maximum timeout between retries.");
+
+    // read config options
+    private static final ConfigOption<String> SCAN_PARTITION_COLUMN =
+            ConfigOptions.key("scan.partition.column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The column name used for partitioning the input.");
+    private static final ConfigOption<Integer> SCAN_PARTITION_NUM =
+            ConfigOptions.key("scan.partition.num")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("The number of partitions.");
+    private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND =
+            ConfigOptions.key("scan.partition.lower-bound")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The smallest value of the first partition.");
+    private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND =
+            ConfigOptions.key("scan.partition.upper-bound")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The largest value of the last partition.");
+    private static final ConfigOption<Integer> SCAN_FETCH_SIZE =
+            ConfigOptions.key("scan.fetch-size")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Gives the reader a hint as to the number of rows that should be fetched "
+                                    + "from the database per round-trip when reading. "
+                                    + "If the value is zero, this hint is ignored.");
+    private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT =
+            ConfigOptions.key("scan.auto-commit")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Sets whether the driver is in auto-commit mode.");
+
+    // look up config options
+    private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions.key("lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "The max number of rows of lookup cache, over this value, the oldest rows will "
+                                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\""
+                                    + " options must all be specified if any of them is specified.");
+    private static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+            ConfigOptions.key("lookup.cache.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The cache time to live.");
+    private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+            ConfigOptions.key("lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The max retry times if lookup database failed.");
+
+    // write config options
+    private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
+            ConfigOptions.key("sink.buffer-flush.max-rows")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "The flush max size (includes all append, upsert and delete records), over this number"
+                                    + " of records, will flush data.");
+    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
+            ConfigOptions.key("sink.buffer-flush.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription(
+                            "The flush interval mills, over this time, asynchronous threads will flush data.");
+    private static final ConfigOption<Integer> SINK_MAX_RETRIES =
+            ConfigOptions.key("sink.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The max retry times if writing records to database failed.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig config = helper.getOptions();
+
+        helper.validate();
+        validateConfigOptions(config);
+        JdbcOptions jdbcOptions = getJdbcOptions(config);
+        TableSchema physicalSchema =
+                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+        return new JdbcDynamicTableSink(
+                jdbcOptions,
+                getJdbcExecutionOptions(config),
+                getJdbcDmlOptions(jdbcOptions, physicalSchema),
+                physicalSchema);
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig config = helper.getOptions();
+
+        helper.validate();
+        validateConfigOptions(config);
+        TableSchema physicalSchema =
+                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        return new JdbcDynamicTableSource(
+                getJdbcOptions(helper.getOptions()),
+                getJdbcReadOptions(helper.getOptions()),
+                getJdbcLookupOptions(helper.getOptions()),
+                physicalSchema);
+    }
+
+    private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
+        final String url = readableConfig.get(URL);
+        final JdbcOptions.Builder builder =
+                JdbcOptions.builder()
+                        .setDBUrl(url)
+                        .setTableName(readableConfig.get(TABLE_NAME))
+                        .setDialect(JdbcDialects.get(url).get())
+                        .setParallelism(
+                                readableConfig
+                                        .getOptional(FactoryUtil.SINK_PARALLELISM)
+                                        .orElse(null))
+                        .setConnectionCheckTimeoutSeconds(
+                                (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
+
+        readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+        return builder.build();
+    }
+
+    private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+        final Optional<String> partitionColumnName =
+                readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+        final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+        if (partitionColumnName.isPresent()) {
+            builder.setPartitionColumnName(partitionColumnName.get());
+            builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+            builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+            builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+        }
+        readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+        builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT));
+        return builder.build();
+    }
+
+    private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig readableConfig) {
+        return new JdbcLookupOptions(
+                readableConfig.get(LOOKUP_CACHE_MAX_ROWS),
+                readableConfig.get(LOOKUP_CACHE_TTL).toMillis(),
+                readableConfig.get(LOOKUP_MAX_RETRIES));
+    }
+
+    private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
+        final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
+        builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+        builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
+        return builder.build();
+    }
+
+    private JdbcDmlOptions getJdbcDmlOptions(JdbcOptions jdbcOptions, TableSchema schema) {
+        String[] keyFields =
+                schema.getPrimaryKey()
+                        .map(pk -> pk.getColumns().toArray(new String[0]))
+                        .orElse(null);
+
+        return JdbcDmlOptions.builder()
+                .withTableName(jdbcOptions.getTableName())
+                .withDialect(jdbcOptions.getDialect())
+                .withFieldNames(schema.getFieldNames())
+                .withKeyFields(keyFields)
+                .build();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(URL);
+        requiredOptions.add(TABLE_NAME);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(DRIVER);
+        optionalOptions.add(USERNAME);
+        optionalOptions.add(PASSWORD);
+        optionalOptions.add(SCAN_PARTITION_COLUMN);
+        optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+        optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+        optionalOptions.add(SCAN_PARTITION_NUM);
+        optionalOptions.add(SCAN_FETCH_SIZE);
+        optionalOptions.add(SCAN_AUTO_COMMIT);
+        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+        optionalOptions.add(LOOKUP_CACHE_TTL);
+        optionalOptions.add(LOOKUP_MAX_RETRIES);
+        optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
+        optionalOptions.add(SINK_MAX_RETRIES);
+        optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
+        optionalOptions.add(MAX_RETRY_TIMEOUT);
+        optionalOptions.add(DIALECT_IMPL);
+        return optionalOptions;
+    }
+
+    private void validateConfigOptions(ReadableConfig config) {
+        // register custom dialect first
+        config.getOptional(DIALECT_IMPL).ifPresent(JdbcDialects::register);
+        String jdbcUrl = config.get(URL);
+        final Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
+        checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + jdbcUrl);
+
+        checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
+
+        checkAllOrNone(
+                config,
+                new ConfigOption[] {
+                        SCAN_PARTITION_COLUMN,
+                        SCAN_PARTITION_NUM,
+                        SCAN_PARTITION_LOWER_BOUND,
+                        SCAN_PARTITION_UPPER_BOUND
+                });
+
+        if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+                && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
+            long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+            long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+            if (lowerBound > upperBound) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "'%s'='%s' must not be larger than '%s'='%s'.",
+                                SCAN_PARTITION_LOWER_BOUND.key(),
+                                lowerBound,
+                                SCAN_PARTITION_UPPER_BOUND.key(),
+                                upperBound));
+            }
+        }
+
+        checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
+
+        if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option shouldn't be negative, but is %s.",
+                            LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
+        }
+
+        if (config.get(SINK_MAX_RETRIES) < 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option shouldn't be negative, but is %s.",
+                            SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
+        }
+
+        if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option must be in second granularity and shouldn't be "
+                                    + "smaller than 1 second, but is %s.",
+                            MAX_RETRY_TIMEOUT.key(),
+                            config.get(
+                                    ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
+                                            .stringType()
+                                            .noDefaultValue())));
+        }
+    }
+
+    private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
+        int presentCount = 0;
+        for (ConfigOption configOption : configOptions) {
+            if (config.getOptional(configOption).isPresent()) {
+                presentCount++;
+            }
+        }
+        String[] propertyNames =
+                Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+        Preconditions.checkArgument(
+                configOptions.length == presentCount || presentCount == 0,
+                "Either all or none of the following options should be provided:\n"
+                        + String.join("\n", propertyNames));
+    }
+}
diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 83e2d7a4d..e4286c7ff 100644
--- a/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
+org.apache.inlong.sort.flink.jdbc.table.JdbcDynamicTableFactory
 org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 6c10a1de2..ee48040d0 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -300,26 +300,6 @@
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.ververica</groupId>
-            <artifactId>flink-connector-postgres-cdc</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.postgresql</groupId>
-            <artifactId>postgresql</artifactId>
-        </dependency>
-
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java
new file mode 100644
index 000000000..19b57cbba
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.flink.clickhouse.table.ClickHouseDialect;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for  {@link ClickHouseLoadNode} and {@link ClickHouseDialect}
+ */
+public class ClickHouseSqlParserTest {
+    public MySqlExtractNode buildMySQLExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        map.put("append-mode", "true");
+        return new MySqlExtractNode(id, "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("work1"), "localhost", "root", "123456",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private ClickHouseLoadNode buildClickHouseLoadNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())));
+
+        return new ClickHouseLoadNode(id, "test_clickhouse",
+                fields,
+                relations,
+                null,
+                null,
+                1,
+                null,
+                "ck_demo",
+                "jdbc:clickhouse://localhost:8123/demo",
+                "default",
+                "",
+                null);
+
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test extract data from mysql and load data to clickhouse.
+     *
+     * @throws Exception The exception may throws when executing
+     */
+    @Test
+    public void testClickHouse() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildClickHouseLoadNode("2");
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+                Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}