You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/28 11:10:26 UTC

[incubator-seatunnel] branch api-draft updated: [API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 3057ba205 [API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)
3057ba205 is described below

commit 3057ba205f1fa4807eee6b2fcbdc675a1cdd50d5
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jun 28 19:10:19 2022 +0800

    [API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)
---
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |   2 +-
 .../seatunnel/api/table/type/SeaTunnelRowType.java |   9 +
 seatunnel-connectors/plugin-mapping.properties     |   3 +
 .../flink/clickhouse/sink/ClickhouseBatchSink.java |   3 -
 .../clickhouse/sink/ClickhouseOutputFormat.java    |   2 +-
 .../pom.xml                                        |  23 ++
 .../{Config.java => ClickhouseFileCopyMethod.java} |  37 ++--
 .../seatunnel/clickhouse/config/Config.java        |  71 +++++-
 .../clickhouse/config/FileReaderOption.java        | 113 ++++++++++
 .../seatunnel/clickhouse/config/ReaderOption.java  |  94 ++++++++
 .../seatunnel/clickhouse/shard/Shard.java          |  98 +++++++++
 .../seatunnel/clickhouse/shard/ShardMetadata.java  | 145 ++++++++++++
 .../clickhouse/sink/DistributedEngine.java         |  58 +++++
 .../sink/client/ClickhouseBatchStatement.java      |  52 +++++
 .../clickhouse/sink/client/ClickhouseProxy.java    | 212 ++++++++++++++++++
 .../clickhouse/sink/client/ClickhouseSink.java     | 172 +++++++++++++++
 .../sink/client/ClickhouseSinkWriter.java          | 230 +++++++++++++++++++
 .../clickhouse/sink/client/ShardRouter.java        |  97 ++++++++
 .../clickhouse/sink/file/ClickhouseFileSink.java   | 145 ++++++++++++
 .../sink/file/ClickhouseFileSinkWriter.java        | 243 +++++++++++++++++++++
 .../clickhouse/sink/file/ClickhouseTable.java      | 117 ++++++++++
 .../Config.java => sink/file/FileTransfer.java}    |  18 +-
 .../clickhouse/sink/file/ScpFileTransfer.java      | 124 +++++++++++
 .../inject/ArrayInjectFunction.java}               |  26 ++-
 .../inject/BigDecimalInjectFunction.java}          |  26 ++-
 .../sink/inject/ClickhouseFieldInjectFunction.java |  46 ++++
 .../inject/DateInjectFunction.java}                |  37 ++--
 .../inject/DateTimeInjectFunction.java}            |  37 ++--
 .../sink/inject/DoubleInjectFunction.java          |  41 ++++
 .../inject/FloatInjectFunction.java}               |  37 ++--
 .../clickhouse/sink/inject/IntInjectFunction.java  |  47 ++++
 .../inject/LongInjectFunction.java}                |  27 +--
 .../inject/StringInjectFunction.java}              |  25 +--
 .../clickhouse/source/ClickhouseSource.java        |  18 +-
 .../source/ClickhouseSourceSplitEnumerator.java    |   1 +
 .../Config.java => state/CKAggCommitInfo.java}     |  18 +-
 .../Config.java => state/CKCommitInfo.java}        |  18 +-
 .../Config.java => state/ClickhouseSinkState.java} |  18 +-
 .../{config/Config.java => tool/IntHolder.java}    |  22 +-
 .../seatunnel/clickhouse/util/ClickhouseUtil.java  |  40 ++++
 seatunnel-dist/release-docs/LICENSE                |   5 +-
 seatunnel-dist/release-docs/NOTICE                 |  12 +
 .../plugin/discovery/AbstractPluginDiscovery.java  |   4 +-
 .../spark/serialization/InternalRowConverter.java  |   3 +
 .../translation/spark/sink/SparkDataWriter.java    |   1 +
 tools/dependencies/known-dependencies.txt          |   5 +
 46 files changed, 2373 insertions(+), 209 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 56f97bac9..268d3d40e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -74,7 +74,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
      */
     void close() throws IOException;
 
-    interface Context {
+    interface Context extends Serializable{
 
         /**
          * Gets the configuration with which Job was started.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
index 848cdf43c..45164c246 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
@@ -76,6 +76,15 @@ public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
         return fieldTypes[index];
     }
 
+    public int indexOf(String fieldName) {
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (fieldNames[i].equals(fieldName)) {
+                return i;
+            }
+        }
+        throw new IllegalArgumentException(String.format("can't find field %s", fieldName));
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj) {
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index d06354594..b14f2f035 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -92,5 +92,8 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
 seatunnel.source.Http = seatunnel-connector-seatunnel-http
 seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
 seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
+seatunnel.source.Clickhouse = seatunnel-connector-seatunnel-clickhouse
+seatunnel.sink.Clickhouse = seatunnel-connector-seatunnel-clickhouse
+seatunnel.sink.ClickhouseFile = seatunnel-connector-seatunnel-clickhouse
 seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
 seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 620e921e1..36ab1a3d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -46,8 +46,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.types.Row;
 import ru.yandex.clickhouse.ClickHouseConnection;
 
-import javax.annotation.Nullable;
-
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -73,7 +71,6 @@ public class ClickhouseBatchSink implements FlinkBatchSink {
         return config;
     }
 
-    @Nullable
     @Override
     public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
         ClickhouseOutputFormat clickhouseOutputFormat = new ClickhouseOutputFormat(config, shardMetadata, fields, tableSchema);
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
index cb392b9a0..efd4eb7d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
@@ -224,7 +224,7 @@ public class ClickhouseOutputFormat extends RichOutputFormat<Row> {
                     break;
                 }
             }
-            result.put(field, function);
+            result.put(fieldType, function);
         }
         return result;
     }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
index e3fb148aa..38c2f6db4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
@@ -36,12 +36,35 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-scp</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
         <!-- TODO add to dependency management after version unify -->
         <dependency>
             <groupId>com.clickhouse</groupId>
             <artifactId>clickhouse-http-client</artifactId>
             <version>0.3.2-patch9</version>
         </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.11.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.3.2-patch9</version>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
index 65b7af7c6..cec1f48bb 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
@@ -17,19 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
-
+public enum ClickhouseFileCopyMethod {
+    SCP("scp"),
+    RSYNC("rsync"),
+    ;
+    private final String name;
+
+    ClickhouseFileCopyMethod(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public static ClickhouseFileCopyMethod from(String name) {
+        for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) {
+            if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) {
+                return clickhouseFileCopyMethod;
+            }
+        }
+        throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
index 65b7af7c6..6563274ba 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
@@ -17,19 +17,80 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 
-/**
- * The config of clickhouse
- */
 public class Config {
 
-    public static final String NODE_ADDRESS = "node_address";
+    /**
+     * Bulk size of clickhouse jdbc
+     */
+    public static final String BULK_SIZE = "bulk_size";
 
-    public static final String DATABASE = "database";
+    /**
+     * Clickhouse fields
+     */
+    public static final String FIELDS = "fields";
 
     public static final String SQL = "sql";
 
+    /**
+     * Clickhouse server host
+     */
+    public static final String HOST = "host";
+
+    /**
+     * Clickhouse table name
+     */
+    public static final String TABLE = "table";
+
+    /**
+     * Clickhouse database name
+     */
+    public static final String DATABASE = "database";
+
+    /**
+     * Clickhouse server username
+     */
     public static final String USERNAME = "username";
 
+    /**
+     * Clickhouse server password
+     */
     public static final String PASSWORD = "password";
 
+    /**
+     * Split mode when table is distributed engine
+     */
+    public static final String SPLIT_MODE = "split_mode";
+
+    /**
+     * When split_mode is true, the sharding_key use for split
+     */
+    public static final String SHARDING_KEY = "sharding_key";
+
+    /**
+     * ClickhouseFile sink connector used clickhouse-local program's path
+     */
+    public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";
+
+    /**
+     * The method of copy Clickhouse file
+     */
+    public static final String COPY_METHOD = "copy_method";
+
+    /**
+     * The size of each batch read temporary data into local file.
+     */
+    public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";
+
+    /**
+     * The password of Clickhouse server node
+     */
+    public static final String NODE_PASS = "node_pass";
+
+    /**
+     * The address of Clickhouse server node
+     */
+    public static final String NODE_ADDRESS = "node_address";
+
+    public static final String CLICKHOUSE_PREFIX = "clickhouse.";
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
new file mode 100644
index 000000000..720810789
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class FileReaderOption implements Serializable {
+
+    private ShardMetadata shardMetadata;
+    private Map<String, String> tableSchema;
+    private List<String> fields;
+    private String clickhouseLocalPath;
+    private ClickhouseFileCopyMethod copyMethod;
+    private boolean nodeFreePass;
+    private Map<String, String> nodePassword;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    public FileReaderOption(ShardMetadata shardMetadata, Map<String, String> tableSchema,
+                            List<String> fields, String clickhouseLocalPath,
+                            ClickhouseFileCopyMethod copyMethod,
+                            Map<String, String> nodePassword) {
+        this.shardMetadata = shardMetadata;
+        this.tableSchema = tableSchema;
+        this.fields = fields;
+        this.clickhouseLocalPath = clickhouseLocalPath;
+        this.copyMethod = copyMethod;
+        this.nodePassword = nodePassword;
+    }
+
+    public SeaTunnelRowType getSeaTunnelRowType() {
+        return seaTunnelRowType;
+    }
+
+    public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    public boolean isNodeFreePass() {
+        return nodeFreePass;
+    }
+
+    public void setNodeFreePass(boolean nodeFreePass) {
+        this.nodeFreePass = nodeFreePass;
+    }
+
+    public String getClickhouseLocalPath() {
+        return clickhouseLocalPath;
+    }
+
+    public void setClickhouseLocalPath(String clickhouseLocalPath) {
+        this.clickhouseLocalPath = clickhouseLocalPath;
+    }
+
+    public ClickhouseFileCopyMethod getCopyMethod() {
+        return copyMethod;
+    }
+
+    public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) {
+        this.copyMethod = copyMethod;
+    }
+
+    public Map<String, String> getNodePassword() {
+        return nodePassword;
+    }
+
+    public void setNodePassword(Map<String, String> nodePassword) {
+        this.nodePassword = nodePassword;
+    }
+
+    public ShardMetadata getShardMetadata() {
+        return shardMetadata;
+    }
+
+    public void setShardMetadata(ShardMetadata shardMetadata) {
+        this.shardMetadata = shardMetadata;
+    }
+
+    public Map<String, String> getTableSchema() {
+        return tableSchema;
+    }
+
+    public void setTableSchema(Map<String, String> tableSchema) {
+        this.tableSchema = tableSchema;
+    }
+
+    public List<String> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<String> fields) {
+        this.fields = fields;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
new file mode 100644
index 000000000..084f54bcc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ReaderOption implements Serializable {
+
+    private ShardMetadata shardMetadata;
+    private List<String> fields;
+
+    private Map<String, String> tableSchema;
+    private SeaTunnelRowType seaTunnelRowType;
+    private Properties properties;
+    private int bulkSize;
+
+    public ReaderOption(ShardMetadata shardMetadata,
+                        Properties properties, List<String> fields, Map<String, String> tableSchema, int bulkSize) {
+        this.shardMetadata = shardMetadata;
+        this.properties = properties;
+        this.fields = fields;
+        this.tableSchema = tableSchema;
+        this.bulkSize = bulkSize;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public ShardMetadata getShardMetadata() {
+        return shardMetadata;
+    }
+
+    public void setShardMetadata(ShardMetadata shardMetadata) {
+        this.shardMetadata = shardMetadata;
+    }
+
+    public SeaTunnelRowType getSeaTunnelRowType() {
+        return seaTunnelRowType;
+    }
+
+    public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    public Map<String, String> getTableSchema() {
+        return tableSchema;
+    }
+
+    public void setTableSchema(Map<String, String> tableSchema) {
+        this.tableSchema = tableSchema;
+    }
+
+    public List<String> getFields() {
+        return fields;
+    }
+
+    public void setFields(List<String> fields) {
+        this.fields = fields;
+    }
+
+    public int getBulkSize() {
+        return bulkSize;
+    }
+
+    public void setBulkSize(int bulkSize) {
+        this.bulkSize = bulkSize;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
new file mode 100644
index 000000000..2fe726325
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
+
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseNode;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+public class Shard implements Serializable {
+    private static final long serialVersionUID = -1L;
+
+    private final int shardNum;
+    private final int replicaNum;
+
+    private final ClickHouseNode node;
+
+    // cache the hash code
+    private int hashCode = -1;
+
+    public Shard(int shardNum,
+                 int shardWeight,
+                 int replicaNum,
+                 String hostname,
+                 String hostAddress,
+                 int port,
+                 String database,
+                 String username,
+                 String password) {
+        this.shardNum = shardNum;
+        this.replicaNum = replicaNum;
+        this.node = ClickHouseNode.builder().host(hostname).address(InetSocketAddress.createUnresolved(hostAddress,
+                port)).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build();
+    }
+
+    public Shard(int shardNum, int replicaNum, ClickHouseNode node) {
+        this.shardNum = shardNum;
+        this.replicaNum = replicaNum;
+        this.node = node;
+    }
+
+    public int getShardNum() {
+        return shardNum;
+    }
+
+    public int getReplicaNum() {
+        return replicaNum;
+    }
+
+    public ClickHouseNode getNode() {
+        return node;
+    }
+
+    public String getJdbcUrl() {
+        return "jdbc:clickhouse://" + node.getAddress().getHostName()
+                + ":" + node.getAddress().getPort() + "/" + node.getDatabase().get();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Shard shard = (Shard) o;
+        return shardNum == shard.shardNum
+                && replicaNum == shard.replicaNum
+                && hashCode == shard.hashCode
+                && Objects.equals(node, shard.node);
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == -1) {
+            hashCode = Objects.hash(shardNum, replicaNum, node, hashCode);
+        }
+        return hashCode;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
new file mode 100644
index 000000000..3c01922f1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class ShardMetadata implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    private String shardKey;
+    private String shardKeyType;
+    private String database;
+    private String table;
+    private boolean splitMode;
+    private Shard defaultShard;
+    private String username;
+    private String password;
+
+    public ShardMetadata(String shardKey,
+                         String shardKeyType,
+                         String database,
+                         String table,
+                         boolean splitMode,
+                         Shard defaultShard,
+                         String username,
+                         String password) {
+        this.shardKey = shardKey;
+        this.shardKeyType = shardKeyType;
+        this.database = database;
+        this.table = table;
+        this.splitMode = splitMode;
+        this.defaultShard = defaultShard;
+        this.username = username;
+        this.password = password;
+    }
+
+    public String getShardKey() {
+        return shardKey;
+    }
+
+    public void setShardKey(String shardKey) {
+        this.shardKey = shardKey;
+    }
+
+    public String getShardKeyType() {
+        return shardKeyType;
+    }
+
+    public void setShardKeyType(String shardKeyType) {
+        this.shardKeyType = shardKeyType;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public boolean getSplitMode() {
+        return splitMode;
+    }
+
+    public void setSplitMode(boolean splitMode) {
+        this.splitMode = splitMode;
+    }
+
+    public Shard getDefaultShard() {
+        return defaultShard;
+    }
+
+    public void setDefaultShard(Shard defaultShard) {
+        this.defaultShard = defaultShard;
+    }
+
+    public boolean isSplitMode() {
+        return splitMode;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ShardMetadata that = (ShardMetadata) o;
+        return splitMode == that.splitMode
+                && Objects.equals(shardKey, that.shardKey)
+                && Objects.equals(shardKeyType, that.shardKeyType)
+                && Objects.equals(database, that.database)
+                && Objects.equals(table, that.table)
+                && Objects.equals(defaultShard, that.defaultShard)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(shardKey, shardKeyType, database, table, splitMode, defaultShard, username, password);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
new file mode 100644
index 000000000..6a15d5919
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+
+import java.io.Serializable;
+
+public class DistributedEngine implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+    private String clusterName;
+    private String database;
+    private String table;
+
+    public DistributedEngine(String clusterName, String database, String table) {
+        this.clusterName = clusterName;
+        this.database = database;
+        this.table = table;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
new file mode 100644
index 000000000..ae525acee
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+
+import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
+
+import java.sql.PreparedStatement;
+
+public class ClickhouseBatchStatement {
+
+    private final ClickHouseConnectionImpl clickHouseConnection;
+    private final PreparedStatement preparedStatement;
+    private final IntHolder intHolder;
+
+    public ClickhouseBatchStatement(ClickHouseConnectionImpl clickHouseConnection,
+                                    PreparedStatement preparedStatement,
+                                    IntHolder intHolder) {
+        this.clickHouseConnection = clickHouseConnection;
+        this.preparedStatement = preparedStatement;
+        this.intHolder = intHolder;
+    }
+
+    public ClickHouseConnectionImpl getClickHouseConnection() {
+        return clickHouseConnection;
+    }
+
+    public PreparedStatement getPreparedStatement() {
+        return preparedStatement;
+    }
+
+    public IntHolder getIntHolder() {
+        return intHolder;
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
new file mode 100644
index 000000000..3a3aa082c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseRecord;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("magicnumber")
+public class ClickhouseProxy {
+
+    private final ClickHouseRequest<?> clickhouseRequest;
+    private final ClickHouseClient client;
+
+    private final Map<Shard, ClickHouseClient> shardToDataSource = new ConcurrentHashMap<>(16);
+
+    public ClickhouseProxy(ClickHouseNode node) {
+        this.client = ClickHouseClient.newInstance(node.getProtocol());
+        this.clickhouseRequest =
+                client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+
+    }
+
+    public ClickHouseRequest<?> getClickhouseConnection() {
+        return this.clickhouseRequest;
+    }
+
+    public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
+        ClickHouseClient c = shardToDataSource
+                .computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol()));
+        return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+    }
+
+    public DistributedEngine getClickhouseDistributedTable(String database, String table) {
+        ClickHouseRequest<?> request = getClickhouseConnection();
+        return getClickhouseDistributedTable(request, database, table);
+    }
+
+    public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database,
+                                                           String table) {
+        String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
+        try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
+            List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
+            if (!records.isEmpty()) {
+                ClickHouseRecord record = records.get(0);
+                // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
+                String engineFull = record.getValue(0).asString();
+                List<String> infos = Arrays.stream(engineFull.substring(12).split(","))
+                        .map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
+                return new DistributedEngine(infos.get(0), infos.get(1), infos.get(2).replace("\\)", "").trim());
+            }
+            throw new RuntimeException("Cannot get distributed table from clickhouse, resultSet is empty");
+        } catch (ClickHouseException e) {
+            throw new RuntimeException("Cannot get distributed table from clickhouse", e);
+        }
+    }
+
+    /**
+     * Get ClickHouse table schema, the key is fileName, value is value type.
+     *
+     * @param table table name.
+     * @return schema map.
+     */
+    public Map<String, String> getClickhouseTableSchema(String table) {
+        ClickHouseRequest<?> request = getClickhouseConnection();
+        return getClickhouseTableSchema(request, table);
+    }
+
+    public Map<String, String> getClickhouseTableSchema(ClickHouseRequest<?> request, String table) {
+        String sql = "desc " + table;
+        Map<String, String> schema = new LinkedHashMap<>();
+        try (ClickHouseResponse response = request.query(sql).executeAndWait()) {
+            response.records().forEach(r -> schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
+        } catch (ClickHouseException e) {
+            throw new RuntimeException("Cannot get table schema from clickhouse", e);
+        }
+        return schema;
+    }
+
+    /**
+     * Get the shard of the given cluster.
+     *
+     * @param connection  clickhouse connection.
+     * @param clusterName cluster name.
+     * @param database    database of the shard.
+     * @param port        port of the shard.
+     * @return shard list.
+     */
+    public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, String clusterName,
+                                           String database, int port, String username, String password) {
+        String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
+        List<Shard> shardList = new ArrayList<>();
+        try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
+            response.records().forEach(r -> {
+                shardList.add(new Shard(
+                        r.getValue(0).asInteger(),
+                        r.getValue(1).asInteger(),
+                        r.getValue(2).asInteger(),
+                        r.getValue(3).asString(),
+                        r.getValue(4).asString(),
+                        port, database, username, password));
+            });
+            return shardList;
+        } catch (ClickHouseException e) {
+            throw new RuntimeException("Cannot get cluster shard list from clickhouse", e);
+        }
+    }
+
+    /**
+     * Get ClickHouse table info.
+     *
+     * @param database database of the table.
+     * @param table    table name of the table.
+     * @return clickhouse table info.
+     */
+    public ClickhouseTable getClickhouseTable(String database, String table) {
+        String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table);
+        try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+            List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
+            if (records.isEmpty()) {
+                throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+            }
+            ClickHouseRecord record = records.get(0);
+            String engine = record.getValue(0).asString();
+            String createTableDDL = record.getValue(1).asString();
+            String engineFull = record.getValue(2).asString();
+            List<String> dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList());
+            DistributedEngine distributedEngine = null;
+            if ("Distributed".equals(engine)) {
+                distributedEngine = getClickhouseDistributedTable(clickhouseRequest, database, table);
+                String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'",
+                        distributedEngine.getDatabase(), distributedEngine.getTable());
+                try (ClickHouseResponse rs = clickhouseRequest.query(localTableSQL).executeAndWait()) {
+                    List<ClickHouseRecord> localTableRecords = rs.stream().collect(Collectors.toList());
+                    if (localTableRecords.isEmpty()) {
+                        throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+                    }
+                    String localEngine = localTableRecords.get(0).getValue(0).asString();
+                    String createLocalTableDDL = localTableRecords.get(0).getValue(1).asString();
+                    createTableDDL = localizationEngine(localEngine, createLocalTableDDL);
+                }
+            }
+            return new ClickhouseTable(
+                    database,
+                    table,
+                    distributedEngine,
+                    engine,
+                    createTableDDL,
+                    engineFull,
+                    dataPaths,
+                    getClickhouseTableSchema(clickhouseRequest, table));
+        } catch (ClickHouseException e) {
+            throw new RuntimeException("Cannot get clickhouse table", e);
+        }
+
+    }
+
+    /**
+     * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+     * For example: change ReplicatedMergeTree to MergeTree.
+     *
+     * @param engine original engine of clickhouse local table
+     * @param ddl    createTableDDL of clickhouse local table
+     * @return createTableDDL of clickhouse local table which can support specific engine
+     * TODO: support more engine
+     */
+    public String localizationEngine(String engine, String ddl) {
+        if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+            return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
+        } else {
+            return ddl;
+        }
+    }
+
+    public void close() {
+        if (this.client != null) {
+            this.client.close();
+        }
+        shardToDataSource.values().forEach(ClickHouseClient::close);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
new file mode 100644
index 000000000..295547c74
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.BULK_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_PREFIX;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SPLIT_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+@AutoService(SeaTunnelSink.class)
+public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+
+    private SeaTunnelContext seaTunnelContext;
+    private ReaderOption option;
+
+    @Override
+    public String getPluginName() {
+        return "Clickhouse";
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, TABLE, USERNAME, PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        Map<String, Object> defaultConfig = ImmutableMap.<String, Object>builder()
+                .put(BULK_SIZE, 20_000)
+                .put(SPLIT_MODE, false)
+                .build();
+
+        config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
+
+        List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+                config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD));
+
+        Properties clickhouseProperties = new Properties();
+        if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
+            TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX, false).entrySet().forEach(e -> {
+                clickhouseProperties.put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
+            });
+        }
+        clickhouseProperties.put("user", config.getString(USERNAME));
+        clickhouseProperties.put("password", config.getString(PASSWORD));
+
+        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+        Map<String, String> tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE));
+        String shardKey = null;
+        String shardKeyType = null;
+        if (config.getBoolean(SPLIT_MODE)) {
+            ClickhouseTable table = proxy.getClickhouseTable(config.getString(DATABASE),
+                    config.getString(TABLE));
+            if (!"Distributed".equals(table.getEngine())) {
+                throw new IllegalArgumentException("split mode only support table which engine is " +
+                        "'Distributed' engine at now");
+            }
+            if (config.hasPath(SHARDING_KEY)) {
+                shardKey = config.getString(SHARDING_KEY);
+                shardKeyType = tableSchema.get(shardKey);
+            }
+        }
+        ShardMetadata metadata = new ShardMetadata(
+                shardKey,
+                shardKeyType,
+                config.getString(DATABASE),
+                config.getString(TABLE),
+                config.getBoolean(SPLIT_MODE),
+                new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD));
+        List<String> fields = new ArrayList<>();
+        if (config.hasPath(FIELDS)) {
+            fields.addAll(config.getStringList(FIELDS));
+            // check if the fields exist in schema
+            for (String field : fields) {
+                if (!tableSchema.containsKey(field)) {
+                    throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE));
+                }
+            }
+        } else {
+            fields.addAll(tableSchema.keySet());
+        }
+        proxy.close();
+        this.option = new ReaderOption(metadata, clickhouseProperties, fields, tableSchema, config.getInt(BULK_SIZE));
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
+        return new ClickhouseSinkWriter(option, context);
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter(SinkWriter.Context context, List<ClickhouseSinkState> states) throws IOException {
+        return SeaTunnelSink.super.restoreWriter(context, states);
+    }
+
+    @Override
+    public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.option.setSeaTunnelRowType(seaTunnelRowType);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.option.getSeaTunnelRowType();
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
new file mode 100644
index 000000000..604f2c609
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+
+import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class);
+
+    private final SinkWriter.Context context;
+    private final ReaderOption option;
+    private final ShardRouter shardRouter;
+    private final transient ClickhouseProxy proxy;
+    private final String prepareSql;
+    private final Map<Shard, ClickhouseBatchStatement> statementMap;
+    private final Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap;
+    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction();
+
+    private static final Pattern NULLABLE = Pattern.compile("Nullable\\((.*)\\)");
+    private static final Pattern LOW_CARDINALITY = Pattern.compile("LowCardinality\\((.*)\\)");
+
+    ClickhouseSinkWriter(ReaderOption option, SinkWriter.Context context) {
+        this.option = option;
+        this.context = context;
+
+        this.proxy = new ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode());
+        this.fieldInjectFunctionMap = initFieldInjectFunctionMap();
+        this.shardRouter = new ShardRouter(proxy, option.getShardMetadata());
+        this.prepareSql = initPrepareSQL();
+        this.statementMap = initStatementMap();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+
+        Object shardKey = null;
+        if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) {
+            int i = this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey());
+            shardKey = element.getField(i);
+        }
+        ClickhouseBatchStatement statement = statementMap.get(shardRouter.getShard(shardKey));
+        PreparedStatement clickHouseStatement = statement.getPreparedStatement();
+        IntHolder sizeHolder = statement.getIntHolder();
+        // add into batch
+        addIntoBatch(element, clickHouseStatement);
+        sizeHolder.setValue(sizeHolder.getValue() + 1);
+        // flush batch
+        if (sizeHolder.getValue() >= option.getBulkSize()) {
+            flush(clickHouseStatement);
+            sizeHolder.setValue(0);
+        }
+    }
+
+    @Override
+    public Optional<CKCommitInfo> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.proxy.close();
+        for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+            try (ClickHouseConnectionImpl needClosedConnection = batchStatement.getClickHouseConnection();
+                 PreparedStatement needClosedStatement = batchStatement.getPreparedStatement()) {
+                IntHolder intHolder = batchStatement.getIntHolder();
+                if (intHolder.getValue() > 0) {
+                    flush(needClosedStatement);
+                    intHolder.setValue(0);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to close prepared statement.", e);
+            }
+        }
+    }
+
+    private void addIntoBatch(SeaTunnelRow row, PreparedStatement clickHouseStatement) {
+        try {
+            for (int i = 0; i < option.getFields().size(); i++) {
+                String fieldName = option.getFields().get(i);
+                Object fieldValue = row.getField(option.getSeaTunnelRowType().indexOf(fieldName));
+                if (fieldValue == null) {
+                    // field does not exist in row
+                    // todo: do we need to transform to default value of each type
+                    clickHouseStatement.setObject(i + 1, null);
+                    continue;
+                }
+                String fieldType = option.getTableSchema().get(fieldName);
+                fieldInjectFunctionMap
+                        .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
+                        .injectFields(clickHouseStatement, i + 1, fieldValue);
+            }
+            clickHouseStatement.addBatch();
+        } catch (SQLException e) {
+            throw new RuntimeException("Add row data into batch error", e);
+        }
+    }
+
+    private void flush(PreparedStatement clickHouseStatement) {
+        try {
+            clickHouseStatement.executeBatch();
+        } catch (Exception e) {
+            throw new RuntimeException("Clickhouse execute batch statement error", e);
+        }
+    }
+
+    private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
+        Map<Shard, ClickhouseBatchStatement> result = new HashMap<>(Common.COLLECTION_SIZE);
+        shardRouter.getShards().forEach((weight, s) -> {
+            try {
+                ClickHouseConnectionImpl clickhouseConnection = new ClickHouseConnectionImpl(s.getJdbcUrl(),
+                        this.option.getProperties());
+                PreparedStatement preparedStatement = clickhouseConnection.prepareStatement(prepareSql);
+                IntHolder intHolder = new IntHolder();
+                ClickhouseBatchStatement batchStatement =
+                        new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder);
+                result.put(s, batchStatement);
+            } catch (SQLException e) {
+                throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e);
+            }
+        });
+        return result;
+    }
+
+    private String initPrepareSQL() {
+        String[] placeholder = new String[option.getFields().size()];
+        Arrays.fill(placeholder, "?");
+
+        return String.format("INSERT INTO %s (%s) VALUES (%s)",
+                shardRouter.getShardTable(),
+                String.join(",", option.getFields()),
+                String.join(",", placeholder));
+    }
+
+    private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
+        Map<String, ClickhouseFieldInjectFunction> result = new HashMap<>(Common.COLLECTION_SIZE);
+        List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions = Lists.newArrayList(
+                new ArrayInjectFunction(),
+                new BigDecimalInjectFunction(),
+                new DateInjectFunction(),
+                new DateTimeInjectFunction(),
+                new DoubleInjectFunction(),
+                new FloatInjectFunction(),
+                new IntInjectFunction(),
+                new LongInjectFunction(),
+                new StringInjectFunction()
+        );
+        ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction();
+        // get field type
+        for (String field : this.option.getFields()) {
+            ClickhouseFieldInjectFunction function = defaultFunction;
+            String fieldType = this.option.getTableSchema().get(field);
+            for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) {
+                if (clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(fieldType))) {
+                    function = clickhouseFieldInjectFunction;
+                    break;
+                }
+            }
+            result.put(fieldType, function);
+        }
+        return result;
+    }
+
+    private String unwrapCommonPrefix(String fieldType) {
+        Matcher nullMatcher = NULLABLE.matcher(fieldType);
+        Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
+        if (nullMatcher.matches()) {
+            return nullMatcher.group(1);
+        } else if (lowMatcher.matches()) {
+            return lowMatcher.group(1);
+        } else {
+            return fieldType;
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
new file mode 100644
index 000000000..4471f8157
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+
+import com.clickhouse.client.ClickHouseRequest;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShardRouter implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    private String shardTable;
+    private final String table;
+    private int shardWeightCount;
+    private final TreeMap<Integer, Shard> shards;
+    private final String shardKey;
+    private final String shardKeyType;
+    private final boolean splitMode;
+
+    private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+    private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
+
+    public ShardRouter(ClickhouseProxy proxy, ShardMetadata shardMetadata) {
+        this.shards = new TreeMap<>();
+        this.shardKey = shardMetadata.getShardKey();
+        this.shardKeyType = shardMetadata.getShardKeyType();
+        this.splitMode = shardMetadata.getSplitMode();
+        this.table = shardMetadata.getTable();
+        if (StringUtils.isNotEmpty(shardKey) && StringUtils.isEmpty(shardKeyType)) {
+            throw new IllegalArgumentException("Shard key " + shardKey + " not found in table " + table);
+        }
+        ClickHouseRequest<?> connection = proxy.getClickhouseConnection();
+        if (splitMode) {
+            DistributedEngine localTable = proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), table);
+            this.shardTable = localTable.getTable();
+            List<Shard> shardList = proxy.getClusterShardList(connection, localTable.getClusterName(),
+                    localTable.getDatabase(), shardMetadata.getDefaultShard().getNode().getPort(),
+                    shardMetadata.getUsername(), shardMetadata.getPassword());
+            int weight = 0;
+            for (Shard shard : shardList) {
+                shards.put(weight, shard);
+                weight += shard.getNode().getWeight();
+            }
+            shardWeightCount = weight;
+        } else {
+            shards.put(0, shardMetadata.getDefaultShard());
+        }
+    }
+
+    public String getShardTable() {
+        return splitMode ? shardTable : table;
+    }
+
+    public Shard getShard(Object shardValue) {
+        if (!splitMode) {
+            return shards.firstEntry().getValue();
+        }
+        if (StringUtils.isEmpty(shardKey) || shardValue == null) {
+            return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
+        }
+        int offset = (int) (HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
+                0) & Long.MAX_VALUE % shardWeightCount);
+        return shards.lowerEntry(offset + 1).getValue();
+    }
+
+    public TreeMap<Integer, Shard> getShards() {
+        return shards;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
new file mode 100644
index 000000000..05c511292
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.COPY_METHOD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_PASS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSink.class)
+public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+
+    private SeaTunnelContext seaTunnelContext;
+    private FileReaderOption readerOption;
+
+    @Override
+    public String getPluginName() {
+        return "ClickhouseFile";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+        if (!checkResult.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg());
+        }
+        Map<String, Object> defaultConfigs = ImmutableMap.<String, Object>builder()
+                .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+                .build();
+
+        config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
+        List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+                config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD));
+
+        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+        Map<String, String> tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE));
+        String shardKey = null;
+        String shardKeyType = null;
+        if (config.hasPath(SHARDING_KEY)) {
+            shardKey = config.getString(SHARDING_KEY);
+            shardKeyType = tableSchema.get(shardKey);
+        }
+        ShardMetadata shardMetadata = new ShardMetadata(
+                shardKey,
+                shardKeyType,
+                config.getString(DATABASE),
+                config.getString(TABLE),
+                false, // we don't need to set splitMode in clickhouse file mode.
+                new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD));
+        List<String> fields;
+        if (config.hasPath(FIELDS)) {
+            fields = config.getStringList(FIELDS);
+            // check if the fields exist in schema
+            for (String field : fields) {
+                if (!tableSchema.containsKey(field)) {
+                    throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE));
+                }
+            }
+        } else {
+            fields = new ArrayList<>(tableSchema.keySet());
+        }
+        Map<String, String> nodePassword = config.getObjectList(NODE_PASS).stream()
+                .collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
+                    configObject -> configObject.toConfig().getString(PASSWORD)));
+
+        proxy.close();
+        this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH),
+                ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD)), nodePassword);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.readerOption.setSeaTunnelRowType(seaTunnelRowType);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.readerOption.getSeaTunnelRowType();
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
+        return new ClickhouseFileSinkWriter(readerOption, context);
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
new file mode 100644
index 000000000..c6b6bfa1f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
+    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
+    private static final int UUID_LENGTH = 10;
+    private final FileReaderOption readerOption;
+    private final ShardRouter shardRouter;
+    private final ClickhouseProxy proxy;
+    private final ClickhouseTable clickhouseTable;
+    private final Map<Shard, List<String>> shardLocalDataPaths;
+    private final Map<Shard, List<SeaTunnelRow>> rowCache;
+
+    public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) {
+        this.readerOption = readerOption;
+        proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
+        shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata());
+        clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
+                this.readerOption.getShardMetadata().getTable());
+        rowCache = new HashMap<>(Common.COLLECTION_SIZE);
+
+        nodePasswordCheck();
+
+        // find file local save path of each node
+        shardLocalDataPaths = shardRouter.getShards().values().stream()
+                .collect(Collectors.toMap(Function.identity(), shard -> {
+                    ClickhouseTable shardTable = proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
+                            clickhouseTable.getLocalTableName());
+                    return shardTable.getDataPaths();
+                }));
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Shard shard = shardRouter.getShard(element);
+        rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element);
+    }
+
+    private void nodePasswordCheck() {
+        if (!this.readerOption.isNodeFreePass()) {
+            shardRouter.getShards().values().forEach(shard -> {
+                if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName())
+                        && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
+                    throw new RuntimeException("Cannot find password of shard " + shard.getNode().getAddress().getHostName());
+                }
+            });
+        }
+    }
+
+    @Override
+    public Optional<CKCommitInfo> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        rowCache.forEach(this::flush);
+    }
+
+    private void flush(Shard shard, List<SeaTunnelRow> rows) {
+        try {
+            // generate clickhouse local file
+            // TODO generate file by sub rows to save memory
+            List<String> clickhouseLocalFiles = generateClickhouseLocalFiles(rows);
+            // move file to server
+            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+            // clear local file
+            clearLocalFileDirectory(clickhouseLocalFiles);
+        } catch (Exception e) {
+            throw new RuntimeException("Flush data into clickhouse file error", e);
+        }
+    }
+
+    private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) throws IOException,
+            InterruptedException {
+        if (rows.isEmpty()) {
+            return Collections.emptyList();
+        }
+        String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_");
+        String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
+        FileUtils.forceMkdir(new File(clickhouseLocalFile));
+        String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log";
+        try (FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE,
+                StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
+            String data = rows.stream()
+                    .map(row -> this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+                            .collect(Collectors.joining("\t")))
+                    .collect(Collectors.joining("\n"));
+            MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+                    data.getBytes(StandardCharsets.UTF_8).length);
+            buffer.put(data.getBytes(StandardCharsets.UTF_8));
+        }
+
+        List<String> localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
+                .collect(Collectors.toList());
+        List<String> command = new ArrayList<>(localPaths);
+        if (localPaths.size() == 1) {
+            command.add("local");
+        }
+        command.add("--file");
+        command.add(clickhouseLocalFileTmpFile);
+        command.add("-S");
+        command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\"");
+        command.add("-N");
+        command.add("\"" + "temp_table" + uuid + "\"");
+        command.add("-q");
+        command.add(String.format(
+                "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+                clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""),
+                clickhouseTable.getLocalTableName(),
+                readerOption.getTableSchema().keySet().stream().map(s -> {
+                    if (readerOption.getFields().contains(s)) {
+                        return s;
+                    } else {
+                        return "NULL";
+                    }
+                }).collect(Collectors.joining(",")),
+                uuid));
+        command.add("--path");
+        command.add("\"" + clickhouseLocalFile + "\"");
+        LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+        ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
+        Process start = processBuilder.start();
+        // we just wait for the process to finish
+        try (InputStream inputStream = start.getInputStream();
+             InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+             BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                LOGGER.info(line);
+            }
+        }
+        start.waitFor();
+        File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName());
+        if (!file.exists()) {
+            throw new RuntimeException("clickhouse local file not exists");
+        }
+        File[] files = file.listFiles();
+        if (files == null) {
+            throw new RuntimeException("clickhouse local file not exists");
+        }
+        return Arrays.stream(files)
+                .filter(File::isDirectory)
+                .filter(f -> !"detached".equals(f.getName()))
+                .map(File::getAbsolutePath).collect(Collectors.toList());
+    }
+
+    private void attachClickhouseLocalFileToServer(Shard shard, List<String> clickhouseLocalFiles) throws ClickHouseException {
+        if (ClickhouseFileCopyMethod.SCP.equals(this.readerOption.getCopyMethod())) {
+            String hostAddress = shard.getNode().getAddress().getHostName();
+            String password = readerOption.getNodePassword().getOrDefault(hostAddress, null);
+            FileTransfer fileTransfer = new ScpFileTransfer(hostAddress, password);
+            fileTransfer.init();
+            fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/");
+            fileTransfer.close();
+        } else {
+            throw new RuntimeException("unsupported clickhouse file copy method " + readerOption.getCopyMethod());
+        }
+
+        ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+        for (String clickhouseLocalFile : clickhouseLocalFiles) {
+            ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'",
+                    clickhouseTable.getLocalTableName(),
+                    clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait();
+            response.close();
+        }
+    }
+
+    private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
+        String clickhouseLocalFile = clickhouseLocalFiles.get(0);
+        String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+        try {
+            File file = new File(localFileDir);
+            if (file.exists()) {
+                FileUtils.deleteDirectory(new File(localFileDir));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to delete directory " + localFileDir, e);
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
new file mode 100644
index 000000000..a6c8e0fe0
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseTable {
+
+    private String database;
+    private String tableName;
+    private String engine;
+    private String engineFull;
+    private String createTableDDL;
+    private List<String> dataPaths;
+    private final DistributedEngine distributedEngine;
+    private Map<String, String> tableSchema;
+
+    public ClickhouseTable(String database,
+                           String tableName,
+                           DistributedEngine distributedEngine,
+                           String engine,
+                           String createTableDDL,
+                           String engineFull,
+                           List<String> dataPaths,
+                           Map<String, String> tableSchema) {
+        this.database = database;
+        this.tableName = tableName;
+        this.distributedEngine = distributedEngine;
+        this.engine = engine;
+        this.engineFull = engineFull;
+        this.createTableDDL = createTableDDL;
+        this.dataPaths = dataPaths;
+        this.tableSchema = tableSchema;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getEngine() {
+        return engine;
+    }
+
+    public void setEngine(String engine) {
+        this.engine = engine;
+    }
+
+    public String getEngineFull() {
+        return engineFull;
+    }
+
+    public void setEngineFull(String engineFull) {
+        this.engineFull = engineFull;
+    }
+
+    public String getCreateTableDDL() {
+        return createTableDDL;
+    }
+
+    public void setCreateTableDDL(String createTableDDL) {
+        this.createTableDDL = createTableDDL;
+    }
+
+    public List<String> getDataPaths() {
+        return dataPaths;
+    }
+
+    public void setDataPaths(List<String> dataPaths) {
+        this.dataPaths = dataPaths;
+    }
+
+    public Map<String, String> getTableSchema() {
+        return tableSchema;
+    }
+
+    public void setTableSchema(Map<String, String> tableSchema) {
+        this.tableSchema = tableSchema;
+    }
+
+    public String getLocalTableName() {
+        if (distributedEngine != null) {
+            return distributedEngine.getTable();
+        } else {
+            return tableName;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
similarity index 67%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
index 65b7af7c6..ca581f0ea 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
@@ -15,21 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
+import java.util.List;
 
-    public static final String DATABASE = "database";
+public interface FileTransfer {
 
-    public static final String SQL = "sql";
+    void init();
 
-    public static final String USERNAME = "username";
+    void transferAndChown(String sourcePath, String targetPath);
 
-    public static final String PASSWORD = "password";
+    void transferAndChown(List<String> sourcePath, String targetPath);
 
+    void close();
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
new file mode 100644
index 000000000..6fa83794c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.scp.client.ScpClient;
+import org.apache.sshd.scp.client.ScpClientCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScpFileTransfer implements FileTransfer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);
+
+    private static final int SCP_PORT = 22;
+
+    private final String host;
+    private final String password;
+
+    private ScpClient scpClient;
+    private ClientSession clientSession;
+    private SshClient sshClient;
+
+    public ScpFileTransfer(String host, String password) {
+        this.host = host;
+        this.password = password;
+    }
+
+    @Override
+    public void init() {
+        try {
+            sshClient = SshClient.setUpDefaultClient();
+            sshClient.start();
+            clientSession = sshClient.connect("root", host, SCP_PORT).verify().getSession();
+            if (password != null) {
+                clientSession.addPasswordIdentity(password);
+            }
+            // TODO support add publicKey to identity
+            if (!clientSession.auth().verify().isSuccess()) {
+                throw new IOException("ssh host " + host + "authentication failed");
+            }
+            scpClient = ScpClientCreator.instance().createScpClient(clientSession);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to connect to host: " + host + " by user: root on port 22", e);
+        }
+    }
+
+    @Override
+    public void transferAndChown(String sourcePath, String targetPath) {
+        try {
+            scpClient.upload(
+                sourcePath,
+                targetPath,
+                ScpClient.Option.Recursive,
+                ScpClient.Option.TargetIsDirectory,
+                ScpClient.Option.PreserveAttributes);
+        } catch (IOException e) {
+            throw new RuntimeException("Scp failed to transfer file: " + sourcePath + " to: " + targetPath, e);
+        }
+        // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+        // make ATTACH command work.
+        List<String> command = new ArrayList<>();
+        command.add("ls");
+        command.add("-l");
+        command.add(targetPath.substring(0,
+                StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+        command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
+        try {
+            String finalCommand = String.join(" ", command);
+            LOGGER.info("execute remote command: " + finalCommand);
+            clientSession.executeRemoteCommand(finalCommand);
+        } catch (IOException e) {
+            // always return error cause xargs return shell command result
+        }
+    }
+
+    @Override
+    public void transferAndChown(List<String> sourcePaths, String targetPath) {
+        if (sourcePaths == null) {
+            throw new IllegalArgumentException("sourcePath is null");
+        }
+        sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, targetPath));
+    }
+
+    @Override
+    public void close() {
+        if (clientSession != null && clientSession.isOpen()) {
+            try {
+                clientSession.close();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to close ssh session", e);
+            }
+        }
+        if (sshClient != null && sshClient.isOpen()) {
+            sshClient.stop();
+            try {
+                sshClient.close();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to close ssh client", e);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
similarity index 56%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
index 65b7af7c6..c564e5501 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
 
-    public static final String SQL = "sql";
+public class ArrayInjectFunction implements ClickhouseFieldInjectFunction {
 
-    public static final String USERNAME = "username";
+    private static final Pattern PATTERN = Pattern.compile("(Array.*)");
 
-    public static final String PASSWORD = "password";
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        statement.setArray(index, (java.sql.Array) value);
+    }
 
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return PATTERN.matcher(fieldType).matches();
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
similarity index 55%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
index 65b7af7c6..25c73ab3f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
 
-    public static final String SQL = "sql";
+public class BigDecimalInjectFunction implements ClickhouseFieldInjectFunction {
 
-    public static final String USERNAME = "username";
+    private static final Pattern PATTERN = Pattern.compile("(Decimal.*)");
 
-    public static final String PASSWORD = "password";
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        statement.setBigDecimal(index, (java.math.BigDecimal) value);
+    }
 
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return PATTERN.matcher(fieldType).matches();
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
new file mode 100644
index 000000000..3e27a6343
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Injects a field into a ClickHouse statement, used to transform a java type into a ClickHouse type.
+ */
+public interface ClickhouseFieldInjectFunction extends Serializable {
+
+    /**
+     * Inject the value into the statement.
+     *
+     * @param statement statement to inject into
+     * @param value     value to inject
+     * @param index     index in the statement
+     */
+    void injectFields(PreparedStatement statement, int index, Object value) throws SQLException;
+
+    /**
+     * If the fieldType need to be injected by the current function.
+     *
+     * @param fieldType field type to inject
+     * @return true if the fieldType need to be injected by the current function
+     */
+    boolean isCurrentFieldType(String fieldType);
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
similarity index 54%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
index 65b7af7c6..7a0b0b648 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class DateInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        if (value instanceof Date) {
+            statement.setDate(index, (Date) value);
+        } else {
+            statement.setDate(index, Date.valueOf(value.toString()));
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Date".equals(fieldType);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
index 65b7af7c6..b85c56afb 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        if (value instanceof Timestamp) {
+            statement.setTimestamp(index, (Timestamp) value);
+        } else {
+            statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "DateTime".equals(fieldType);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java
new file mode 100644
index 000000000..c416d110c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class DoubleInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        if (value instanceof BigDecimal) {
+            statement.setDouble(index, ((BigDecimal) value).doubleValue());
+        } else {
+            statement.setDouble(index, (Double) value);
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "UInt32".equals(fieldType)
+            || "UInt64".equals(fieldType)
+            || "Int64".equals(fieldType)
+            || "Float64".equals(fieldType);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
index 65b7af7c6..84464808b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class FloatInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        if (value instanceof BigDecimal) {
+            statement.setFloat(index, ((BigDecimal) value).floatValue());
+        } else {
+            statement.setFloat(index, (Float) value);
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Float32".equals(fieldType);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java
new file mode 100644
index 000000000..f6e8c27dc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class IntInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        if (value instanceof Byte) {
+            statement.setByte(index, (Byte) value);
+
+        } else if (value instanceof Short) {
+            statement.setShort(index, (Short) value);
+
+        } else {
+            statement.setInt(index, (Integer) value);
+
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Int8".equals(fieldType)
+                || "UInt8".equals(fieldType)
+                || "Int16".equals(fieldType)
+                || "UInt16".equals(fieldType)
+                || "Int32".equals(fieldType);
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
index 65b7af7c6..ccd3e60b8 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
@@ -15,21 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 
-    public static final String USERNAME = "username";
+public class LongInjectFunction implements ClickhouseFieldInjectFunction {
 
-    public static final String PASSWORD = "password";
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        statement.setLong(index, (Long) value);
+    }
 
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "UInt32".equals(fieldType)
+            || "UInt64".equals(fieldType)
+            || "Int64".equals(fieldType);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
similarity index 61%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
index 65b7af7c6..4894774dc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 
-    public static final String USERNAME = "username";
+public class StringInjectFunction implements ClickhouseFieldInjectFunction {
 
-    public static final String PASSWORD = "password";
+    @Override
+    public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+        statement.setString(index, value.toString());
+    }
 
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "String".equals(fieldType);
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index c5166d8b7..a94344e72 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
@@ -37,22 +37,19 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.clickhouse.client.ClickHouseClient;
-import com.clickhouse.client.ClickHouseCredentials;
 import com.clickhouse.client.ClickHouseException;
 import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseNode;
-import com.clickhouse.client.ClickHouseProtocol;
 import com.clickhouse.client.ClickHouseResponse;
 import com.google.auto.service.AutoService;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSource.class)
 public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState> {
@@ -68,17 +65,12 @@ public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, Clickhous
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, NODE_ADDRESS, DATABASE, SQL, USERNAME, PASSWORD);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, SQL, USERNAME, PASSWORD);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        servers = Arrays.stream(config.getString(NODE_ADDRESS).split(",")).map(address -> {
-            String[] nodeAndPort = address.split(":", 2);
-            return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
-                            Integer.parseInt(nodeAndPort[1])).database(config.getString(DATABASE))
-                    .credentials(ClickHouseCredentials.fromUserAndPassword(config.getString(USERNAME),
-                            config.getString(PASSWORD))).build();
-        }).collect(Collectors.toList());
+        servers = ClickhouseUtil.createNodes(config.getString(HOST), config.getString(DATABASE),
+                config.getString(USERNAME), config.getString(PASSWORD));
 
         sql = config.getString(SQL);
         try (ClickHouseClient client = ClickHouseClient.newInstance(servers.get(0).getProtocol());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
index ce07b3412..66d0621df 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -33,6 +33,7 @@ public class ClickhouseSourceSplitEnumerator implements
     private final Set<Integer> readers;
     private volatile int assigned = -1;
 
+    // TODO support read distributed engine use multi split
     ClickhouseSourceSplitEnumerator(Context<ClickhouseSourceSplit> enumeratorContext) {
         this.context = enumeratorContext;
         this.readers = new HashSet<>();
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
index 65b7af7c6..2de15ac9c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
@@ -15,21 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
+import java.io.Serializable;
 
+public class CKAggCommitInfo implements Serializable {
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
index 65b7af7c6..99464801d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
@@ -15,21 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
+import java.io.Serializable;
 
+public class CKCommitInfo implements Serializable {
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
index 65b7af7c6..28d9dc2ed 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
@@ -15,21 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String DATABASE = "database";
-
-    public static final String SQL = "sql";
-
-    public static final String USERNAME = "username";
-
-    public static final String PASSWORD = "password";
+import java.io.Serializable;
 
+public class ClickhouseSinkState implements Serializable {
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
similarity index 67%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
index 65b7af7c6..02e7be596 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool;
 
-/**
- * The config of clickhouse
- */
-public class Config {
-
-    public static final String NODE_ADDRESS = "node_address";
+import java.io.Serializable;
 
-    public static final String DATABASE = "database";
+public class IntHolder implements Serializable {
 
-    public static final String SQL = "sql";
+    private static final long serialVersionUID = -1L;
 
-    public static final String USERNAME = "username";
+    private int value;
 
-    public static final String PASSWORD = "password";
+    public int getValue() {
+        return value;
+    }
 
+    public void setValue(int value) {
+        this.value = value;
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
new file mode 100644
index 000000000..38c835831
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ClickhouseUtil {
+
+    public static List<ClickHouseNode> createNodes(String nodeAddress, String database, String username,
+                                                   String password) {
+        return Arrays.stream(nodeAddress.split(",")).map(address -> {
+            String[] nodeAndPort = address.split(":", 2);
+            return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+                            Integer.parseInt(nodeAndPort[1])).database(database)
+                    .credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build();
+        }).collect(Collectors.toList());
+    }
+
+}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 091b8ca9a..dd1a4432c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -354,7 +354,6 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons Email (org.apache.commons:commons-email:1.5 - http://commons.apache.org/proper/commons-email/)
      (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.11.0 - https://commons.apache.org/proper/commons-io/)
      (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.5 - http://commons.apache.org/proper/commons-io/)
-     (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.8.0 - https://commons.apache.org/proper/commons-io/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.5 - http://commons.apache.org/proper/commons-lang/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.6 - http://commons.apache.org/proper/commons-lang/)
@@ -421,6 +420,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.6 - http://hc.apache.org/httpcomponents-client)
      (Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.9 - http://hc.apache.org/httpcomponents-client)
      (Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.2 - http://hc.apache.org/httpcomponents-client)
+     (Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.13 - http://hc.apache.org/httpcomponents-client)
      (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.10 - http://hc.apache.org/httpcomponents-core-ga)
      (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.11 - http://hc.apache.org/httpcomponents-core-ga)
      (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.12 - http://hc.apache.org/httpcomponents-core-ga)
@@ -760,6 +760,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client:1.26.0 - https://github.com/googleapis/google-http-java-client/google-http-client)
      (The Apache Software License, Version 2.0) Google OAuth Client Library for Java (com.google.oauth-client:google-oauth-client:1.26.0 - https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
      (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
+     (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.9.0 - http://code.google.com/p/google-gson/)
      (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
      (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
      (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
@@ -833,7 +834,9 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.5.1 - https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) cli (org.elasticsearch:elasticsearch-cli:6.3.1 - https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) clickhouse-client (com.clickhouse:clickhouse-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
+     (The Apache Software License, Version 2.0) clickhouse-grpc-client (com.clickhouse:clickhouse-grpc-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
      (The Apache Software License, Version 2.0) clickhouse-http-client (com.clickhouse:clickhouse-http-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
+     (The Apache Software License, Version 2.0) clickhouse-jdbc (com.clickhouse:clickhouse-jdbc:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
      (The Apache Software License, Version 2.0) clickhouse-jdbc (ru.yandex.clickhouse:clickhouse-jdbc:0.2 - https://github.com/yandex/clickhouse-jdbc)
      (The Apache Software License, Version 2.0) elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.5.1 - https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) elasticsearch-core (org.elasticsearch:elasticsearch-core:6.3.1 - https://github.com/elastic/elasticsearch)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index 4f0186eb7..fe3c949f6 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4377,4 +4377,16 @@ Copyright 2017-2021 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+=========================================================================
+
+Apache HttpClient Mime NOTICE
+
+=========================================================================
+
+Apache HttpClient Mime
+Copyright 1999-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
 =========================================================================
\ No newline at end of file
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index cc8427d9d..edbd78812 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -62,14 +62,14 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
         return pluginIdentifiers.stream()
             .map(this::getPluginJarPath)
             .filter(Optional::isPresent)
-            .map(Optional::get)
+            .map(Optional::get).distinct()
             .collect(Collectors.toList());
     }
 
     @Override
     public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
-            .map(this::getPluginInstance)
+            .map(this::getPluginInstance).distinct()
             .collect(Collectors.toList());
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 09ab6d239..bcd3d7887 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.MutableLong;
 import org.apache.spark.sql.catalyst.expressions.MutableShort;
 import org.apache.spark.sql.catalyst.expressions.MutableValue;
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.sql.Date;
@@ -77,6 +78,8 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
                 return Timestamp.valueOf((LocalDateTime) field);
             case MAP:
                 return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::convert);
+            case STRING:
+                return UTF8String.fromString((String) field);
             default:
                 return field;
         }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index ec31649e7..6674db544 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -78,6 +78,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
         }
         SparkWriterCommitMessage<CommitInfoT> sparkWriterCommitMessage = new SparkWriterCommitMessage<>(latestCommitInfoT);
         cleanCommitInfo();
+        sinkWriter.close();
         return sparkWriterCommitMessage;
     }
 
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index c3327b428..5ad2f9439 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -65,8 +65,11 @@ chill-java-0.9.3.jar
 chill_2.11-0.9.3.jar
 classmate-1.1.0.jar
 clickhouse-client-0.3.2-patch9.jar
+clickhouse-grpc-client-0.3.2-patch9-netty.jar
+clickhouse-http-client-0.3.2-patch9-shaded.jar
 clickhouse-http-client-0.3.2-patch9.jar
 clickhouse-jdbc-0.2.jar
+clickhouse-jdbc-0.3.2-patch9.jar
 commons-beanutils-1.9.3.jar
 commons-cli-1.2.jar
 commons-cli-1.3.1.jar
@@ -186,6 +189,7 @@ google-http-client-1.26.0.jar
 google-http-client-jackson2-1.26.0.jar
 google-oauth-client-1.26.0.jar
 gson-2.2.4.jar
+gson-2.9.0.jar
 guava-19.0.jar
 guice-3.0.jar
 guice-4.1.0.jar
@@ -264,6 +268,7 @@ httpasyncclient-4.1.4.jar
 httpclient-4.5.13.jar
 httpcore-4.4.4.jar
 httpcore-nio-4.4.4.jar
+httpmime-4.5.13.jar
 httpmime-4.5.2.jar
 hudi-spark-bundle_2.11-0.10.0.jar
 i18n-util-1.0.4.jar