You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/20 08:23:27 UTC
[incubator-inlong] branch master updated: [InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8f40ecfdb [InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)
8f40ecfdb is described below
commit 8f40ecfdb539b335208becd592e66725259525d1
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri May 20 16:23:20 2022 +0800
[InLong-4227][Sort] Sort lightwieght support extract data to ClickHouse (#4249)
---
.../sort/protocol/constant/PostgresConstant.java | 2 +-
.../apache/inlong/sort/protocol/node/LoadNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
.../protocol/node/load/ClickHouseLoadNode.java | 116 +++++++
.../protocol/node/load/ClickHouseLoadNodeTest.java | 50 +++
inlong-sort/sort-connectors/pom.xml | 19 +
.../flink/clickhouse/table/ClickHouseDialect.java | 122 +++++++
.../clickhouse/table/ClickHouseRowConverter.java | 40 +++
.../sort/flink/jdbc/table/AbstractJdbcDialect.java | 103 ++++++
.../inlong/sort/flink/jdbc/table/JdbcDialects.java | 60 ++++
.../flink/jdbc/table/JdbcDynamicTableFactory.java | 384 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 1 +
inlong-sort/sort-single-tenant/pom.xml | 20 --
.../flink/parser/ClickHouseSqlParserTest.java | 126 +++++++
14 files changed, 1028 insertions(+), 23 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
index b9a15a466..e43907bf8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/PostgresConstant.java
@@ -49,6 +49,6 @@ public class PostgresConstant {
public static final String URL = "url";
- public static final String JDBC = "jdbc";
+ public static final String JDBC = "jdbc-inlong";
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index fd54d4589..60a895d3e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -28,6 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
@@ -52,7 +53,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
@JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
@JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad"),
- @JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad")
+ @JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad"),
+ @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad")
})
@NoArgsConstructor
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 57f9f4522..ef2f172fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -27,6 +27,7 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -60,7 +61,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
@JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
@JsonSubTypes.Type(value = PostgresLoadNode.class, name = "postgresLoad"),
- @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad")
+ @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = "fileSystemLoad"),
+ @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad")
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
new file mode 100644
index 000000000..8bad0b2e0
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("clickHouseLoad")
+@Data
+@NoArgsConstructor
+public class ClickHouseLoadNode extends LoadNode implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ @JsonProperty("tableName")
+ @Nonnull
+ private String tableName;
+
+ @JsonProperty("url")
+ @Nonnull
+ private String url;
+
+ @JsonProperty("userName")
+ @Nonnull
+ private String userName;
+
+ @JsonProperty("passWord")
+ @Nonnull
+ private String password;
+
+ @JsonProperty("parFields")
+ List<FieldInfo> partitionFields;
+
+ @JsonCreator
+ public ClickHouseLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("tableName") String tableName,
+ @Nonnull @JsonProperty("url") String url,
+ @Nonnull @JsonProperty("userName") String userName,
+ @Nonnull @JsonProperty("passWord") String password,
+ @JsonProperty("parFields") List<FieldInfo> partitionFields) {
+ super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
+ this.url = Preconditions.checkNotNull(url, "url is null");
+ this.userName = Preconditions.checkNotNull(userName, "userName is null");
+ this.password = Preconditions.checkNotNull(password, "password is null");
+ this.partitionFields = partitionFields;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+ options.put("connector", "jdbc-inlong");
+ options.put("dialect-impl", "org.apache.inlong.sort.flink.clickhouse.table.ClickHouseDialect");
+ options.put("url", url);
+ options.put("table-name", tableName);
+ options.put("username", userName);
+ options.put("password", password);
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return super.getPrimaryKey();
+ }
+
+ @Override
+ public List<FieldInfo> getPartitionFields() {
+ return partitionFields;
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
new file mode 100644
index 000000000..4d6056edf
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Arrays;
+
+/**
+ * Test for {@link ClickHouseLoadNode} Serialization/Deserialization.
+ */
+public class ClickHouseLoadNodeTest extends SerializeBaseTest<Node> {
+ @Override
+ public Node getTestObject() {
+
+ return new ClickHouseLoadNode("2", "test_clickhouse",
+ Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
+ Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+ new FieldInfo("id", new StringFormatInfo()))),
+ null,
+ null,
+ 1,
+ null,
+ "ck_demo",
+ "jdbc:clickhouse://localhost:8023/default",
+ "root",
+ "root",
+ null);
+ }
+}
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index ba0d80efc..502d1508c 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -124,6 +124,25 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
</dependency>
+ <!--for hbase-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+ </dependency>
+ <!--for postgresql-->
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-postgres-cdc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+ <!--for jdbc-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>
+ </dependency>
<!--format dependency-->
<dependency>
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java
new file mode 100644
index 000000000..950e8e09c
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseDialect.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.flink.jdbc.table.AbstractJdbcDialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class ClickHouseDialect extends AbstractJdbcDialect {
+
+ // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+ // https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
+ private static final int MAX_TIMESTAMP_PRECISION = 8;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+ // https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
+ private static final int MAX_DECIMAL_PRECISION = 128;
+ private static final int MIN_DECIMAL_PRECISION = 32;
+
+ @Override
+ public String dialectName() {
+ return "ClickHouse";
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:clickhouse:");
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new ClickHouseRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public int maxDecimalPrecision() {
+ return MAX_DECIMAL_PRECISION;
+ }
+
+ @Override
+ public int minDecimalPrecision() {
+ return MIN_DECIMAL_PRECISION;
+
+ }
+
+ @Override
+ public int maxTimestampPrecision() {
+ return MAX_TIMESTAMP_PRECISION;
+ }
+
+ @Override
+ public int minTimestampPrecision() {
+ return MIN_TIMESTAMP_PRECISION;
+ }
+
+ /**
+ * Defines the unsupported types for the dialect.
+ *
+ * @return a list of logical type roots.
+ */
+ public List<LogicalTypeRoot> unsupportedTypes() {
+ // ClickHouse support data type in
+ // https://clickhouse.com/docs/en/sql-reference/data-types/
+ return Arrays.asList(
+ LogicalTypeRoot.BINARY,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+ LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+ LogicalTypeRoot.INTERVAL_DAY_TIME,
+ LogicalTypeRoot.ARRAY,
+ LogicalTypeRoot.MULTISET,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ROW,
+ LogicalTypeRoot.DISTINCT_TYPE,
+ LogicalTypeRoot.STRUCTURED_TYPE,
+ LogicalTypeRoot.NULL,
+ LogicalTypeRoot.RAW,
+ LogicalTypeRoot.SYMBOL,
+ LogicalTypeRoot.UNRESOLVED);
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java
new file mode 100644
index 000000000..70cad5423
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/table/ClickHouseRowConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * Derby.
+ */
+public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String converterName() {
+ return "ClickHouse";
+ }
+
+ public ClickHouseRowConverter(RowType rowType) {
+ super(rowType);
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java
new file mode 100644
index 000000000..27929ecd6
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/AbstractJdbcDialect.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+
+import java.util.List;
+
+/**
+ * Default JDBC dialects implements for validate.
+ */
+public abstract class AbstractJdbcDialect implements JdbcDialect {
+
+ @Override
+ public void validate(TableSchema schema) throws ValidationException {
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ DataType dt = schema.getFieldDataType(i).get();
+ String fieldName = schema.getFieldName(i).get();
+
+ // TODO: We can't convert VARBINARY(n) data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter
+ // when n is smaller than Integer.MAX_VALUE
+ if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
+ || (dt.getLogicalType() instanceof VarBinaryType
+ && Integer.MAX_VALUE
+ != ((VarBinaryType) dt.getLogicalType()).getLength())) {
+ throw new ValidationException(
+ String.format(
+ "The %s dialect doesn't support type: %s.",
+ dialectName(), dt.toString()));
+ }
+
+ // only validate precision of DECIMAL type for blink planner
+ if (dt.getLogicalType() instanceof DecimalType) {
+ int precision = ((DecimalType) dt.getLogicalType()).getPrecision();
+ if (precision > maxDecimalPrecision() || precision < minDecimalPrecision()) {
+ throw new ValidationException(
+ String.format(
+ "The precision of field '%s' is out of the DECIMAL "
+ + "precision range [%d, %d] supported by %s dialect.",
+ fieldName,
+ minDecimalPrecision(),
+ maxDecimalPrecision(),
+ dialectName()));
+ }
+ }
+
+ // only validate precision of DECIMAL type for blink planner
+ if (dt.getLogicalType() instanceof TimestampType) {
+ int precision = ((TimestampType) dt.getLogicalType()).getPrecision();
+ if (precision > maxTimestampPrecision() || precision < minTimestampPrecision()) {
+ throw new ValidationException(
+ String.format(
+ "The precision of field '%s' is out of the TIMESTAMP "
+ + "precision range [%d, %d] supported by %s dialect.",
+ fieldName,
+ minTimestampPrecision(),
+ maxTimestampPrecision(),
+ dialectName()));
+ }
+ }
+ }
+ }
+
+ public abstract int maxDecimalPrecision();
+
+ public abstract int minDecimalPrecision();
+
+ public abstract int maxTimestampPrecision();
+
+ public abstract int minTimestampPrecision();
+
+ /**
+ * Defines the unsupported types for the dialect.
+ *
+ * @return a list of logical type roots.
+ */
+ public abstract List<LogicalTypeRoot> unsupportedTypes();
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java
new file mode 100644
index 000000000..326798211
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDialects.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Default JDBC dialects.
+ */
+public final class JdbcDialects {
+
+ private static final List<JdbcDialect> DIALECTS = new ArrayList<>();
+
+ static {
+ DIALECTS.add(new MySQLDialect());
+ DIALECTS.add(new PostgresDialect());
+ }
+
+ /** Fetch the JdbcDialect class corresponding to a given database url. */
+ public static Optional<JdbcDialect> get(String url) {
+ for (JdbcDialect dialect : DIALECTS) {
+ if (dialect.canHandle(url)) {
+ return Optional.of(dialect);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /** Fetch the JdbcDialect class corresponding to a given database url. */
+ public static void register(String dialectImpl) {
+ try {
+ JdbcDialect dialect = (JdbcDialect) Class.forName(dialectImpl).newInstance();
+ DIALECTS.add(dialect);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("Cannot register such dialect impl: " + dialectImpl, e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java
new file mode 100644
index 000000000..896f186d2
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/jdbc/table/JdbcDynamicTableFactory.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.jdbc.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
+ * JdbcDynamicTableSink}.We modify it to strengthen capacity of registering other dialect.
+ */
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "jdbc-inlong";
+ public static final ConfigOption<String> DIALECT_IMPL =
+ ConfigOptions.key("dialect-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC Custom Dialect.");
+ public static final ConfigOption<String> URL =
+ ConfigOptions.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC database URL.");
+ public static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC table name.");
+ public static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC user name.");
+ public static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC password.");
+ private static final ConfigOption<String> DRIVER =
+ ConfigOptions.key("driver")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The class name of the JDBC driver to use to connect to this URL. "
+ + "If not set, it will automatically be derived from the URL.");
+ public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
+ ConfigOptions.key("connection.max-retry-timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription("Maximum timeout between retries.");
+
+ // read config options
+ private static final ConfigOption<String> SCAN_PARTITION_COLUMN =
+ ConfigOptions.key("scan.partition.column")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The column name used for partitioning the input.");
+ private static final ConfigOption<Integer> SCAN_PARTITION_NUM =
+ ConfigOptions.key("scan.partition.num")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The number of partitions.");
+ private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND =
+ ConfigOptions.key("scan.partition.lower-bound")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The smallest value of the first partition.");
+ private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND =
+ ConfigOptions.key("scan.partition.upper-bound")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The largest value of the last partition.");
+ private static final ConfigOption<Integer> SCAN_FETCH_SIZE =
+ ConfigOptions.key("scan.fetch-size")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Gives the reader a hint as to the number of rows that should be fetched "
+ + "from the database per round-trip when reading. "
+ + "If the value is zero, this hint is ignored.");
+ private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT =
+ ConfigOptions.key("scan.auto-commit")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Sets whether the driver is in auto-commit mode.");
+
+ // look up config options
+ private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+ ConfigOptions.key("lookup.cache.max-rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "The max number of rows of lookup cache, over this value, the oldest rows will "
+ + "be eliminated. \"cache.max-rows\" and \"cache.ttl\""
+ + " options must all be specified if any of them is specified.");
+ private static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+ ConfigOptions.key("lookup.cache.ttl")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription("The cache time to live.");
+ private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+ ConfigOptions.key("lookup.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The max retry times if lookup database failed.");
+
+ // write config options
+ private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
+ ConfigOptions.key("sink.buffer-flush.max-rows")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The flush max size (includes all append, upsert and delete records), over this number"
+ + " of records, will flush data.");
+ private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
+ ConfigOptions.key("sink.buffer-flush.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription(
+ "The flush interval mills, over this time, asynchronous threads will flush data.");
+ private static final ConfigOption<Integer> SINK_MAX_RETRIES =
+ ConfigOptions.key("sink.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The max retry times if writing records to database failed.");
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config);
+ JdbcOptions jdbcOptions = getJdbcOptions(config);
+ TableSchema physicalSchema =
+ TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+ return new JdbcDynamicTableSink(
+ jdbcOptions,
+ getJdbcExecutionOptions(config),
+ getJdbcDmlOptions(jdbcOptions, physicalSchema),
+ physicalSchema);
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config);
+ TableSchema physicalSchema =
+ TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ return new JdbcDynamicTableSource(
+ getJdbcOptions(helper.getOptions()),
+ getJdbcReadOptions(helper.getOptions()),
+ getJdbcLookupOptions(helper.getOptions()),
+ physicalSchema);
+ }
+
+ private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
+ final String url = readableConfig.get(URL);
+ final JdbcOptions.Builder builder =
+ JdbcOptions.builder()
+ .setDBUrl(url)
+ .setTableName(readableConfig.get(TABLE_NAME))
+ .setDialect(JdbcDialects.get(url).get())
+ .setParallelism(
+ readableConfig
+ .getOptional(FactoryUtil.SINK_PARALLELISM)
+ .orElse(null))
+ .setConnectionCheckTimeoutSeconds(
+ (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
+
+ readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+ readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+ readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+ return builder.build();
+ }
+
+ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+ final Optional<String> partitionColumnName =
+ readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+ final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+ if (partitionColumnName.isPresent()) {
+ builder.setPartitionColumnName(partitionColumnName.get());
+ builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+ builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+ builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+ }
+ readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+ builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT));
+ return builder.build();
+ }
+
+ private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig readableConfig) {
+ return new JdbcLookupOptions(
+ readableConfig.get(LOOKUP_CACHE_MAX_ROWS),
+ readableConfig.get(LOOKUP_CACHE_TTL).toMillis(),
+ readableConfig.get(LOOKUP_MAX_RETRIES));
+ }
+
+ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
+ final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
+ builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+ builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
+ return builder.build();
+ }
+
+ private JdbcDmlOptions getJdbcDmlOptions(JdbcOptions jdbcOptions, TableSchema schema) {
+ String[] keyFields =
+ schema.getPrimaryKey()
+ .map(pk -> pk.getColumns().toArray(new String[0]))
+ .orElse(null);
+
+ return JdbcDmlOptions.builder()
+ .withTableName(jdbcOptions.getTableName())
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(schema.getFieldNames())
+ .withKeyFields(keyFields)
+ .build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(URL);
+ requiredOptions.add(TABLE_NAME);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+ optionalOptions.add(DRIVER);
+ optionalOptions.add(USERNAME);
+ optionalOptions.add(PASSWORD);
+ optionalOptions.add(SCAN_PARTITION_COLUMN);
+ optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_NUM);
+ optionalOptions.add(SCAN_FETCH_SIZE);
+ optionalOptions.add(SCAN_AUTO_COMMIT);
+ optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+ optionalOptions.add(LOOKUP_CACHE_TTL);
+ optionalOptions.add(LOOKUP_MAX_RETRIES);
+ optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
+ optionalOptions.add(SINK_MAX_RETRIES);
+ optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
+ optionalOptions.add(MAX_RETRY_TIMEOUT);
+ optionalOptions.add(DIALECT_IMPL);
+ return optionalOptions;
+ }
+
+ private void validateConfigOptions(ReadableConfig config) {
+ // register custom dialect first
+ config.getOptional(DIALECT_IMPL).ifPresent(JdbcDialects::register);
+ String jdbcUrl = config.get(URL);
+ final Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
+ checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + jdbcUrl);
+
+ checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
+
+ checkAllOrNone(
+ config,
+ new ConfigOption[] {
+ SCAN_PARTITION_COLUMN,
+ SCAN_PARTITION_NUM,
+ SCAN_PARTITION_LOWER_BOUND,
+ SCAN_PARTITION_UPPER_BOUND
+ });
+
+ if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+ && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
+ long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+ long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+ if (lowerBound > upperBound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "'%s'='%s' must not be larger than '%s'='%s'.",
+ SCAN_PARTITION_LOWER_BOUND.key(),
+ lowerBound,
+ SCAN_PARTITION_UPPER_BOUND.key(),
+ upperBound));
+ }
+ }
+
+ checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
+
+ if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
+ }
+
+ if (config.get(SINK_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
+ }
+
+ if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option must be in second granularity and shouldn't be "
+ + "smaller than 1 second, but is %s.",
+ MAX_RETRY_TIMEOUT.key(),
+ config.get(
+ ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
+ .stringType()
+ .noDefaultValue())));
+ }
+ }
+
+ private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
+ int presentCount = 0;
+ for (ConfigOption configOption : configOptions) {
+ if (config.getOptional(configOption).isPresent()) {
+ presentCount++;
+ }
+ }
+ String[] propertyNames =
+ Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+ Preconditions.checkArgument(
+ configOptions.length == presentCount || presentCount == 0,
+ "Either all or none of the following options should be provided:\n"
+ + String.join("\n", propertyNames));
+ }
+}
diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 83e2d7a4d..e4286c7ff 100644
--- a/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
+org.apache.inlong.sort.flink.jdbc.table.JdbcDynamicTableFactory
org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 6c10a1de2..ee48040d0 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -300,26 +300,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-postgres-cdc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.postgresql</groupId>
- <artifactId>postgresql</artifactId>
- </dependency>
-
</dependencies>
<build>
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java
new file mode 100644
index 000000000..19b57cbba
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/ClickHouseSqlParserTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.flink.clickhouse.table.ClickHouseDialect;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link ClickHouseLoadNode} and {@link ClickHouseDialect}
+ */
+public class ClickHouseSqlParserTest {
+ public MySqlExtractNode buildMySQLExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ map.put("append-mode", "true");
+ return new MySqlExtractNode(id, "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("work1"), "localhost", "root", "123456",
+ "inlong", null, null,
+ null, null);
+ }
+
+ private ClickHouseLoadNode buildClickHouseLoadNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())));
+
+ return new ClickHouseLoadNode(id, "test_clickhouse",
+ fields,
+ relations,
+ null,
+ null,
+ 1,
+ null,
+ "ck_demo",
+ "jdbc:clickhouse://localhost:8123/demo",
+ "default",
+ "",
+ null);
+
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test extract data from mysql and load data to clickhouse.
+ *
+ * @throws Exception The exception may throws when executing
+ */
+ @Test
+ public void testClickHouse() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode("1");
+ Node outputNode = buildClickHouseLoadNode("2");
+ StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+ Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ FlinkSqlParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}