You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/14 01:55:07 UTC

[incubator-seatunnel] branch dev updated: [Feature][Plugin] Add Plugin Flink Sink Clickhouse (#1688)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c3d5f166 [Feature][Plugin] Add Plugin Flink Sink Clickhouse (#1688)
c3d5f166 is described below

commit c3d5f16635246e5a71868f8396612f83a7311fa2
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Apr 14 09:54:59 2022 +0800

    [Feature][Plugin] Add Plugin Flink Sink Clickhouse (#1688)
    
    * [Feature][Plugin] Add Plugin Flink Sink Clickhouse
    
    * fix code style
    
    * Add serializable into Retrymaterial
    
    * Add doc
---
 docs/en/connector/sink/Clickhouse.md               |   2 +-
 .../common/config/TypesafeConfigUtils.java         |   4 +-
 .../apache/seatunnel/common/utils/RetryUtils.java  |  96 +++++++++
 .../seatunnel-connectors-flink/pom.xml             |   1 +
 .../seatunnel-connector-flink-clickhouse}/pom.xml  |  34 ++-
 .../seatunnel/flink/clickhouse/ConfigKey.java      | 109 ++++++++++
 .../flink/clickhouse/pojo/DistributedEngine.java   |  57 +++++
 .../seatunnel/flink/clickhouse/pojo/IntHolder.java |  21 +-
 .../seatunnel/flink/clickhouse/pojo/Shard.java     | 132 ++++++++++++
 .../flink/clickhouse/pojo/ShardMetadata.java       | 135 ++++++++++++
 .../flink/clickhouse/sink/ClickhouseBatchSink.java | 141 +++++++++++++
 .../clickhouse/sink/ClickhouseOutputFormat.java    | 231 +++++++++++++++++++++
 .../sink/client/ClickhouseBatchStatement.java      |  51 +++++
 .../clickhouse/sink/client/ClickhouseClient.java   | 141 +++++++++++++
 .../flink/clickhouse/sink/client/ShardRouter.java  |  99 +++++++++
 .../sink/inject/ArrayInjectFunction.java           |  25 ++-
 .../sink/inject/BigDecimalInjectFunction.java      |  23 +-
 .../sink/inject/ClickhouseFieldInjectFunction.java |  46 ++++
 .../clickhouse/sink/inject/DateInjectFunction.java |  28 +--
 .../sink/inject/DateTimeInjectFunction.java        |  28 +--
 .../sink/inject/DoubleInjectFunction.java          |  42 ++++
 .../sink/inject/FloatInjectFunction.java           |  28 +--
 .../clickhouse/sink/inject/IntInjectFunction.java  |  28 +--
 .../clickhouse/sink/inject/LongInjectFunction.java |  24 ++-
 .../sink/inject/StringInjectFunction.java          |  25 ++-
 .../org.apache.seatunnel.flink.BaseFlinkSink       |  18 ++
 seatunnel-core/seatunnel-core-flink/pom.xml        |   6 +
 seatunnel-e2e/seatunnel-flink-e2e/pom.xml          |   7 +
 .../apache/seatunnel/e2e/flink/FlinkContainer.java |   4 +-
 .../flink/clickhouse/FakeSourceToClickhouseIT.java | 118 +++++++++++
 .../e2e/flink/fake/FakeSourceToConsoleIT.java      |   2 +-
 .../clickhouse/fakesource_to_clickhouse.conf       |  60 ++++++
 32 files changed, 1649 insertions(+), 117 deletions(-)

diff --git a/docs/en/connector/sink/Clickhouse.md b/docs/en/connector/sink/Clickhouse.md
index 9f091354..4d2b0bc1 100644
--- a/docs/en/connector/sink/Clickhouse.md
+++ b/docs/en/connector/sink/Clickhouse.md
@@ -9,7 +9,7 @@ Use [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) to correspo
 Engine Supported and plugin name
 
 * [x] Spark: Clickhouse
-* [ ] Flink
+* [x] Flink: Clickhouse
 
 :::
 
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index 3c294c82..b172d3b9 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
+import lombok.NonNull;
+
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -93,7 +95,7 @@ public final class TypesafeConfigUtils {
     }
 
     @SuppressWarnings("unchecked")
-    public static <T> T getConfig(final Config config, final String configKey, final T defaultValue) {
+    public static <T> T getConfig(final Config config, final String configKey, @NonNull final T defaultValue) {
         if (defaultValue.getClass().equals(Long.class)) {
             return config.hasPath(configKey) ? (T) Long.valueOf(config.getString(configKey)) : defaultValue;
         }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
new file mode 100644
index 00000000..28ae40d5
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+public class RetryUtils {
+
+    /**
+     * Execute the given execution with retry
+     *
+     * @param execution     execution to execute
+     * @param retryMaterial retry material, defined the condition to retry
+     * @param <T>           result type
+     * @return result of execution
+     */
+    public static <T> T retryWithException(Execution<T, Exception> execution, RetryMaterial retryMaterial) throws Exception {
+        final RetryCondition<Exception> retryCondition = retryMaterial.getRetryCondition();
+        final int retryTimes = retryMaterial.getRetryTimes();
+
+        if (retryMaterial.getRetryTimes() < 0) {
+            throw new IllegalArgumentException("Retry times must be greater than 0");
+        }
+        int i = 0;
+        do {
+            i++;
+            try {
+                return execution.execute();
+            } catch (Exception e) {
+                if (retryCondition != null && !retryCondition.canRetry(e)) {
+                    if (retryMaterial.shouldThrowException()) {
+                        throw e;
+                    }
+                }
+            }
+        } while (i <= retryTimes);
+        if (retryMaterial.shouldThrowException()) {
+            throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times");
+        }
+        return null;
+    }
+
+    public static class RetryMaterial {
+        /**
+         * Retry times, if you set it to 1, the given execution will be executed twice.
+         * Should be greater than 0.
+         */
+        private final int retryTimes;
+        /**
+         * If set true, the given execution will throw exception if it failed after retry.
+         */
+        private final boolean shouldThrowException;
+        // this is the exception condition, can add result condition in the future.
+        private final RetryCondition<Exception> retryCondition;
+
+        public RetryMaterial(int retryTimes, boolean shouldThrowException, RetryCondition<Exception> retryCondition) {
+            this.retryTimes = retryTimes;
+            this.shouldThrowException = shouldThrowException;
+            this.retryCondition = retryCondition;
+        }
+
+        public int getRetryTimes() {
+            return retryTimes;
+        }
+
+        public boolean shouldThrowException() {
+            return shouldThrowException;
+        }
+
+        public RetryCondition<Exception> getRetryCondition() {
+            return retryCondition;
+        }
+    }
+
+    public interface Execution<T, E extends Exception> {
+        T execute() throws E;
+    }
+
+    public interface RetryCondition<T> {
+        boolean canRetry(T input);
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index 438ee62d..165b80a1 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -41,6 +41,7 @@
         <module>seatunnel-connector-flink-socket</module>
         <module>seatunnel-connector-flink-doris</module>
         <module>seatunnel-connector-flink-influxdb</module>
+        <module>seatunnel-connector-flink-clickhouse</module>
     </modules>
 
 </project>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
similarity index 60%
copy from seatunnel-e2e/seatunnel-flink-e2e/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
index 2be116e6..bf198afd 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
@@ -1,42 +1,64 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-flink</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-flink-e2e</artifactId>
-    <packaging>jar</packaging>
+    <artifactId>seatunnel-connector-flink-clickhouse</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-flink</artifactId>
+            <artifactId>seatunnel-api-flink</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-scp</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/ConfigKey.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/ConfigKey.java
new file mode 100644
index 00000000..c4fc480d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/ConfigKey.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse;
+
+public class ConfigKey {
+
+    /**
+     * Bulk size of clickhouse jdbc
+     */
+    public static final String BULK_SIZE = "bulk_size";
+
+    /**
+     * Clickhouse jdbc retry time
+     */
+    public static final String RETRY = "retry";
+
+    /**
+     * Clickhouse fields
+     */
+    public static final String FIELDS = "fields";
+
+    /**
+     * Clickhouse server host
+     */
+    public static final String HOST = "host";
+
+    /**
+     * Clickhouse table name
+     */
+    public static final String TABLE = "table";
+
+    /**
+     * Clickhouse database name
+     */
+    public static final String DATABASE = "database";
+
+    /**
+     * Clickhouse server username
+     */
+    public static final String USERNAME = "username";
+
+    /**
+     * Clickhouse server password
+     */
+    public static final String PASSWORD = "password";
+
+    /**
+     * Split mode when table is distributed engine
+     */
+    public static final String SPLIT_MODE = "split_mode";
+
+    /**
+     * When split_mode is true, the sharding_key use for split
+     */
+    public static final String SHARDING_KEY = "sharding_key";
+
+    /**
+     * The retry code when use clickhouse jdbc
+     */
+    public static final String RETRY_CODES = "retry_codes";
+
+    /**
+     * ClickhouseFile sink connector used clickhouse-local program's path
+     */
+    public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";
+
+    /**
+     * The method of copy Clickhouse file
+     */
+    public static final String COPY_METHOD = "copy_method";
+
+    /**
+     * The size of each batch read temporary data into local file.
+     */
+    public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";
+
+    /**
+     * Clickhouse server node is free-password.
+     */
+    public static final String NODE_FREE_PASSWORD = "node_free_password";
+
+    /**
+     * The password of Clickhouse server node
+     */
+    public static final String NODE_PASS = "node_pass";
+
+    /**
+     * The address of Clickhouse server node
+     */
+    public static final String NODE_ADDRESS = "node_address";
+
+    public static final String CLICKHOUSE_PREFIX = "clickhouse.";
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/DistributedEngine.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/DistributedEngine.java
new file mode 100644
index 00000000..eb8ebe0a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/DistributedEngine.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.pojo;
+
+import java.io.Serializable;
+
+public class DistributedEngine implements Serializable {
+    private static final long serialVersionUID = -1L;
+    private String clusterName;
+    private String database;
+    private String table;
+
+    public DistributedEngine(String clusterName, String database, String table) {
+        this.clusterName = clusterName;
+        this.database = database;
+        this.table = table;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
similarity index 59%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
index e31a7548..68e2178a 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
@@ -15,21 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.pojo;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+public class IntHolder {
+    private int value;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+    public int getValue() {
+        return value;
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    public void setValue(int value) {
+        this.value = value;
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/Shard.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/Shard.java
new file mode 100644
index 00000000..20f34be1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/Shard.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.pojo;
+
+import java.io.Serializable;
+
+public class Shard implements Serializable {
+    private static final long serialVersionUID = -1L;
+
+    private final int shardNum;
+    private final int shardWeight;
+    private final int replicaNum;
+    private final String hostname;
+    private final String hostAddress;
+    private final String port;
+    private final String database;
+
+    // cache the hash code
+    private int hashCode = -1;
+
+    public Shard(int shardNum,
+                 int shardWeight,
+                 int replicaNum,
+                 String hostname,
+                 String hostAddress,
+                 String port,
+                 String database) {
+        this.shardNum = shardNum;
+        this.shardWeight = shardWeight;
+        this.replicaNum = replicaNum;
+        this.hostname = hostname;
+        this.hostAddress = hostAddress;
+        this.port = port;
+        this.database = database;
+    }
+
+    public int getShardNum() {
+        return shardNum;
+    }
+
+    public int getShardWeight() {
+        return shardWeight;
+    }
+
+    public int getReplicaNum() {
+        return replicaNum;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public String getHostAddress() {
+        return hostAddress;
+    }
+
+    public String getPort() {
+        return port;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getJdbcUrl() {
+        return "jdbc:clickhouse://" + hostAddress + ":" + port + "/" + database;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Shard shard = (Shard) o;
+
+        if (shardNum != shard.shardNum) {
+            return false;
+        }
+        if (shardWeight != shard.shardWeight) {
+            return false;
+        }
+        if (replicaNum != shard.replicaNum) {
+            return false;
+        }
+        if (!hostname.equals(shard.hostname)) {
+            return false;
+        }
+        if (!hostAddress.equals(shard.hostAddress)) {
+            return false;
+        }
+        if (!port.equals(shard.port)) {
+            return false;
+        }
+        return database.equals(shard.database);
+    }
+
+    @Override
+    @SuppressWarnings("magicnumber")
+    public int hashCode() {
+        if (hashCode != -1) {
+            return hashCode;
+        }
+        int result = shardNum;
+        result = 31 * result + shardWeight;
+        result = 31 * result + replicaNum;
+        result = 31 * result + hostname.hashCode();
+        result = 31 * result + hostAddress.hashCode();
+        result = 31 * result + port.hashCode();
+        result = 31 * result + database.hashCode();
+        hashCode = result;
+        return hashCode;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ShardMetadata.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ShardMetadata.java
new file mode 100644
index 00000000..96e27336
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ShardMetadata.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.pojo;
+
+import java.io.Serializable;
+
+public class ShardMetadata implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    private String shardKey;
+    private String shardKeyType;
+    private String database;
+    private String table;
+    private boolean splitMode;
+    private Shard defaultShard;
+
+    public ShardMetadata(String shardKey,
+                         String shardKeyType,
+                         String database,
+                         String table,
+                         boolean splitMode,
+                         Shard defaultShard) {
+        this.shardKey = shardKey;
+        this.shardKeyType = shardKeyType;
+        this.database = database;
+        this.table = table;
+        this.splitMode = splitMode;
+        this.defaultShard = defaultShard;
+    }
+
+    public String getShardKey() {
+        return shardKey;
+    }
+
+    public void setShardKey(String shardKey) {
+        this.shardKey = shardKey;
+    }
+
+    public String getShardKeyType() {
+        return shardKeyType;
+    }
+
+    public void setShardKeyType(String shardKeyType) {
+        this.shardKeyType = shardKeyType;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public boolean getSplitMode() {
+        return splitMode;
+    }
+
+    public void setSplitMode(boolean splitMode) {
+        this.splitMode = splitMode;
+    }
+
+    public Shard getDefaultShard() {
+        return defaultShard;
+    }
+
+    public void setDefaultShard(Shard defaultShard) {
+        this.defaultShard = defaultShard;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ShardMetadata that = (ShardMetadata) o;
+
+        if (splitMode != that.splitMode) {
+            return false;
+        }
+        if (!shardKey.equals(that.shardKey)) {
+            return false;
+        }
+        if (!shardKeyType.equals(that.shardKeyType)) {
+            return false;
+        }
+        if (!database.equals(that.database)) {
+            return false;
+        }
+        if (!table.equals(that.table)) {
+            return false;
+        }
+        return defaultShard.equals(that.defaultShard);
+    }
+
+    @Override
+    @SuppressWarnings("magicnumber")
+    public int hashCode() {
+        int result = shardKey.hashCode();
+        result = 31 * result + shardKeyType.hashCode();
+        result = 31 * result + database.hashCode();
+        result = 31 * result + table.hashCode();
+        result = 31 * result + (splitMode ? 1 : 0);
+        result = 31 * result + defaultShard.hashCode();
+        return result;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
new file mode 100644
index 00000000..8c3d2a80
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.BULK_SIZE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.FIELDS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.HOST;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.RETRY;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.RETRY_CODES;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.SHARDING_KEY;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.SPLIT_MODE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.TABLE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.types.Row;
+import ru.yandex.clickhouse.ClickHouseConnection;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("magicnumber")
+public class ClickhouseBatchSink implements FlinkBatchSink {
+
+    private ShardMetadata shardMetadata;
+    private Config config;
+
+    private Map<String, String> tableSchema = new HashMap<>();
+    private List<String> fields;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Nullable
+    @Override
+    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+        ClickhouseOutputFormat clickhouseOutputFormat = new ClickhouseOutputFormat(config, shardMetadata, fields, tableSchema);
+        return dataSet.output(clickhouseOutputFormat);
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD);
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment env) {
+        Map<String, Object> defaultConfig = ImmutableMap.<String, Object>builder()
+            .put(BULK_SIZE, 20_000)
+            .put(RETRY_CODES, new ArrayList<>())
+            .put(RETRY, 1)
+            .put(SPLIT_MODE, false)
+            .build();
+
+        config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
+
+        ClickhouseClient clickhouseClient = new ClickhouseClient(config);
+        boolean splitMode = config.getBoolean(SPLIT_MODE);
+        String table = config.getString(TABLE);
+        String database = config.getString(DATABASE);
+        String[] hostAndPort = config.getString(HOST).split(":");
+        try (ClickHouseConnection connection = clickhouseClient.getClickhouseConnection()) {
+            tableSchema = clickhouseClient.getClickhouseTableSchema(connection, table);
+            String shardKey = TypesafeConfigUtils.getConfig(this.config, SHARDING_KEY, "");
+            String shardKeyType = tableSchema.get(shardKey);
+            shardMetadata = new ShardMetadata(
+                shardKey,
+                shardKeyType,
+                database,
+                table,
+                splitMode,
+                new Shard(1, 1, 1, hostAndPort[0], hostAndPort[0], hostAndPort[1], database));
+
+            if (this.config.hasPath(FIELDS)) {
+                fields = this.config.getStringList(FIELDS);
+                // check if the fields exist in schema
+                for (String field : fields) {
+                    if (!tableSchema.containsKey(field)) {
+                        throw new RuntimeException("Field " + field + " does not exist in table " + table);
+                    }
+                }
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to connect to clickhouse server", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Clickhouse";
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
new file mode 100644
index 00000000..cb392b9a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.BULK_SIZE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.RETRY;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.RETRY_CODES;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseBatchStatement;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ShardRouter;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.ArrayInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.BigDecimalInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.DateInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.DateTimeInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.DoubleInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.FloatInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.IntInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.LongInjectFunction;
+import org.apache.seatunnel.flink.clickhouse.sink.inject.StringInjectFunction;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import ru.yandex.clickhouse.ClickHouseConnectionImpl;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
+import ru.yandex.clickhouse.ClickHouseStatement;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("magicnumber")
+public class ClickhouseOutputFormat extends RichOutputFormat<Row> {
+
+    private static final long serialVersionUID = -1L;
+
+    private final Config config;
+    private final List<String> fields;
+    private final Map<String, String> tableSchema;
+    private final ShardMetadata shardMetadata;
+    private final int batchSize;
+
+    // The below fields should be created by open function.
+    private transient RetryUtils.RetryMaterial retryMaterial;
+    private transient ShardRouter shardRouter;
+    private transient ClickhouseClient clickhouseClient;
+    private transient String prepareSql;
+    private transient Map<Shard, ClickhouseBatchStatement> statementMap;
+    private transient Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap;
+    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction();
+
+    public ClickhouseOutputFormat(Config config,
+                                  ShardMetadata shardMetadata,
+                                  List<String> fields,
+                                  Map<String, String> tableSchema) {
+        this.config = config;
+        this.shardMetadata = shardMetadata;
+        this.fields = fields;
+        this.tableSchema = tableSchema;
+        this.batchSize = config.getInt(BULK_SIZE);
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        List<Integer> retryCodes = config.getIntList(RETRY_CODES);
+        retryMaterial = new RetryUtils.RetryMaterial(config.getInt(RETRY), true, exception -> {
+            if (exception instanceof SQLException) {
+                SQLException sqlException = (SQLException) exception;
+                return retryCodes.contains(sqlException.getErrorCode());
+            }
+            return false;
+        });
+        clickhouseClient = new ClickhouseClient(config);
+        fieldInjectFunctionMap = initFieldInjectFunctionMap();
+        shardRouter = new ShardRouter(clickhouseClient, shardMetadata);
+        prepareSql = initPrepareSQL();
+        statementMap = initStatementMap();
+    }
+
+    @Override
+    public void writeRecord(Row row) {
+        ClickhouseBatchStatement batchStatement = statementMap.get(shardRouter.getShard(row));
+        ClickHousePreparedStatementImpl clickHouseStatement = batchStatement.getPreparedStatement();
+        IntHolder sizeHolder = batchStatement.getIntHolder();
+        // add into batch
+        addIntoBatch(row, clickHouseStatement);
+        sizeHolder.setValue(sizeHolder.getValue() + 1);
+        // flush batch
+        if (sizeHolder.getValue() >= batchSize) {
+            flush(clickHouseStatement);
+            sizeHolder.setValue(0);
+        }
+    }
+
+    @Override
+    public void close() {
+        for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+            try (ClickHouseConnectionImpl needClosedConnection = batchStatement.getClickHouseConnection();
+                 ClickHousePreparedStatementImpl needClosedStatement = batchStatement.getPreparedStatement()) {
+                IntHolder intHolder = batchStatement.getIntHolder();
+                if (intHolder.getValue() > 0) {
+                    flush(needClosedStatement);
+                    intHolder.setValue(0);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to close prepared statement.", e);
+            }
+        }
+    }
+
+    private void addIntoBatch(Row row, ClickHousePreparedStatementImpl clickHouseStatement) {
+        try {
+            for (int i = 0; i < fields.size(); i++) {
+                String fieldName = fields.get(i);
+                Object fieldValue = row.getField(fieldName);
+                if (fieldValue == null) {
+                    // field does not exist in row
+                    // todo: do we need to transform to default value of each type
+                    clickHouseStatement.setObject(i + 1, null);
+                    continue;
+                }
+                String fieldType = tableSchema.get(fieldName);
+                fieldInjectFunctionMap
+                    .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
+                    .injectFields(clickHouseStatement, i + 1, fieldValue);
+            }
+            clickHouseStatement.addBatch();
+        } catch (SQLException e) {
+            throw new RuntimeException("Add row data into batch error", e);
+        }
+    }
+
+    private void flush(ClickHouseStatement clickHouseStatement) {
+        RetryUtils.Execution<Void, Exception> execution = () -> {
+            clickHouseStatement.executeBatch();
+            return null;
+        };
+        try {
+            RetryUtils.retryWithException(execution, retryMaterial);
+        } catch (Exception e) {
+            throw new RuntimeException("Clickhouse execute batch statement error", e);
+        }
+    }
+
+    private String initPrepareSQL() {
+        String[] placeholder = new String[fields.size()];
+        Arrays.fill(placeholder, "?");
+
+        return String.format("INSERT INTO %s (%s) VALUES (%s)",
+            shardRouter.getShardTable(),
+            String.join(",", fields),
+            String.join(",", placeholder));
+    }
+
+    private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
+        Map<Shard, ClickhouseBatchStatement> result = new HashMap<>(16);
+        shardRouter.getShards().forEach((weight, shard) -> {
+            try {
+                ClickHouseConnectionImpl clickhouseConnection = clickhouseClient.getClickhouseConnection();
+                ClickHousePreparedStatementImpl preparedStatement =
+                    (ClickHousePreparedStatementImpl) clickhouseConnection.prepareStatement(prepareSql);
+                IntHolder intHolder = new IntHolder();
+                ClickhouseBatchStatement batchStatement =
+                    new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder);
+                result.put(shard, batchStatement);
+            } catch (SQLException e) {
+                throw new RuntimeException("Clickhouse prepare statement error", e);
+            }
+        });
+        return result;
+    }
+
+    private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
+        Map<String, ClickhouseFieldInjectFunction> result = new HashMap<>(16);
+        List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions = Lists.newArrayList(
+            new ArrayInjectFunction(),
+            new BigDecimalInjectFunction(),
+            new DateInjectFunction(),
+            new DateTimeInjectFunction(),
+            new DoubleInjectFunction(),
+            new FloatInjectFunction(),
+            new IntInjectFunction(),
+            new LongInjectFunction(),
+            new StringInjectFunction()
+        );
+        ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction();
+        // get field type
+        for (String field : fields) {
+            ClickhouseFieldInjectFunction function = defaultFunction;
+            String fieldType = tableSchema.get(field);
+            for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) {
+                if (clickhouseFieldInjectFunction.isCurrentFieldType(fieldType)) {
+                    function = clickhouseFieldInjectFunction;
+                    break;
+                }
+            }
+            result.put(field, function);
+        }
+        return result;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseBatchStatement.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseBatchStatement.java
new file mode 100644
index 00000000..da7a0d08
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.client;
+
+import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
+
+import ru.yandex.clickhouse.ClickHouseConnectionImpl;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
+
+public class ClickhouseBatchStatement {
+
+    private final ClickHouseConnectionImpl clickHouseConnection;
+    private final ClickHousePreparedStatementImpl preparedStatement;
+    private final IntHolder intHolder;
+
+    public ClickhouseBatchStatement(ClickHouseConnectionImpl clickHouseConnection,
+                                    ClickHousePreparedStatementImpl preparedStatement,
+                                    IntHolder intHolder) {
+        this.clickHouseConnection = clickHouseConnection;
+        this.preparedStatement = preparedStatement;
+        this.intHolder = intHolder;
+    }
+
+    public ClickHouseConnectionImpl getClickHouseConnection() {
+        return clickHouseConnection;
+    }
+
+    public ClickHousePreparedStatementImpl getPreparedStatement() {
+        return preparedStatement;
+    }
+
+    public IntHolder getIntHolder() {
+        return intHolder;
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
new file mode 100644
index 00000000..c14b99c6
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.client;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.CLICKHOUSE_PREFIX;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.HOST;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseConnectionImpl;
+import ru.yandex.clickhouse.ClickHouseStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("magicnumber")
+public class ClickhouseClient {
+
+    private final BalancedClickhouseDataSource balancedClickhouseDataSource;
+
+    public ClickhouseClient(Config config) {
+        Properties clickhouseProperties = new Properties();
+        if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
+            TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX, false).entrySet().forEach(e -> {
+                clickhouseProperties.put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
+            });
+        }
+        clickhouseProperties.put("user", config.getString(USERNAME));
+        clickhouseProperties.put("password", config.getString(PASSWORD));
+        String jdbcUrl = "jdbc:clickhouse://" + config.getString(HOST) + "/" + config.getString(DATABASE);
+        this.balancedClickhouseDataSource = new BalancedClickhouseDataSource(jdbcUrl, clickhouseProperties);
+    }
+
+    public ClickHouseConnectionImpl getClickhouseConnection() {
+        try {
+            return (ClickHouseConnectionImpl) balancedClickhouseDataSource.getConnection();
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot connect to clickhouse server", e);
+        }
+    }
+
+    public DistributedEngine getClickhouseDistributedTable(String database, String table) {
+        try (ClickHouseConnection connection = getClickhouseConnection()) {
+            return getClickhouseDistributedTable(connection, database, table);
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get distributed table from clickhouse", e);
+        }
+    }
+
+    public DistributedEngine getClickhouseDistributedTable(ClickHouseConnection connection, String database, String table) {
+        String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
+        try (ClickHouseStatement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            if (resultSet.next()) {
+                // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
+                String engineFull = resultSet.getString(1);
+                List<String> infos = Arrays.stream(engineFull.substring(12).split(","))
+                    .map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
+                return new DistributedEngine(infos.get(0), infos.get(1), infos.get(2).replace("\\)", "").trim());
+            }
+            throw new RuntimeException("Cannot get distributed table from clickhouse, resultSet is empty");
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get distributed table from clickhouse", e);
+        }
+    }
+
+    public Map<String, String> getClickhouseTableSchema(String table) {
+        try (ClickHouseConnection connection = getClickhouseConnection()) {
+            return getClickhouseTableSchema(connection, table);
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get table schema from clickhouse", e);
+        }
+    }
+
+    public Map<String, String> getClickhouseTableSchema(ClickHouseConnection connection, String table) {
+        String sql = "desc " + table;
+        Map<String, String> schema = new LinkedHashMap<>();
+        try (ClickHouseStatement clickHouseStatement = connection.createStatement()) {
+            ResultSet resultSet = clickHouseStatement.executeQuery(sql);
+            while (resultSet.next()) {
+                schema.put(resultSet.getString(1), resultSet.getString(2));
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get table schema from clickhouse", e);
+        }
+        return schema;
+    }
+
+    public List<Shard> getClusterShardList(ClickHouseConnection connection, String clusterName, String database, String port) {
+        String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
+        List<Shard> shardList = new ArrayList<>();
+        try (ClickHouseStatement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                shardList.add(new Shard(
+                    resultSet.getInt(1),
+                    resultSet.getInt(2),
+                    resultSet.getInt(3),
+                    resultSet.getString(4),
+                    resultSet.getString(5),
+                    port,
+                    database));
+            }
+            return shardList;
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get cluster shard list from clickhouse", e);
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
new file mode 100644
index 00000000..e6ad4438
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.client;
+
+import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.types.Row;
+import ru.yandex.clickhouse.ClickHouseConnection;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShardRouter {
+
+    private String shardTable;
+    private final String table;
+    private int shardWeightCount;
+    private final TreeMap<Integer, Shard> shards;
+    private final String shardKey;
+    private final String shardKeyType;
+    private final boolean splitMode;
+
+    private final XXHash64 hashInstance = XXHashFactory.fastestInstance().hash64();
+    private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
+
+    public ShardRouter(ClickhouseClient clickhouseClient, ShardMetadata shardMetadata) {
+        this.shards = new TreeMap<>();
+        this.shardKey = shardMetadata.getShardKey();
+        this.shardKeyType = shardMetadata.getShardKeyType();
+        this.splitMode = shardMetadata.getSplitMode();
+        this.table = shardMetadata.getTable();
+        if (StringUtils.isNotEmpty(shardKey) && StringUtils.isEmpty(shardKeyType)) {
+            throw new IllegalArgumentException("Shard key " + shardKey + " not found in table " + table);
+        }
+
+        try (
+            ClickHouseConnection connection = clickhouseClient.getClickhouseConnection()) {
+            if (splitMode) {
+                DistributedEngine localTable = clickhouseClient.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), table);
+                this.shardTable = localTable.getTable();
+                List<Shard> shardList = clickhouseClient.getClusterShardList(connection, localTable.getClusterName(), localTable.getDatabase(), shardMetadata.getDefaultShard().getPort());
+                int weight = 0;
+                for (Shard shard : shardList) {
+                    shards.put(weight, shard);
+                    weight += shard.getShardWeight();
+                }
+                shardWeightCount = weight;
+            } else {
+                shards.put(0, shardMetadata.getDefaultShard());
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public String getShardTable() {
+        return splitMode ? shardTable : table;
+    }
+
+    public Shard getShard(Row row) {
+        if (!splitMode) {
+            return shards.firstEntry().getValue();
+        }
+        if (StringUtils.isEmpty(shardKey) || row.getField(shardKey) == null) {
+            return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
+        }
+        int offset = (int) (hashInstance.hash(ByteBuffer.wrap(row.getField(shardKey).toString().getBytes(StandardCharsets.UTF_8)), 0) & Long.MAX_VALUE % shardWeightCount);
+        return shards.lowerEntry(offset + 1).getValue();
+    }
+
+    public TreeMap<Integer, Shard> getShards() {
+        return shards;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ArrayInjectFunction.java
similarity index 55%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ArrayInjectFunction.java
index e31a7548..39a92a43 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ArrayInjectFunction.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
 
-import java.io.IOException;
+public class ArrayInjectFunction implements ClickhouseFieldInjectFunction {
 
-public class FakeSourceToConsoleIT extends FlinkContainer {
+    private static final Pattern PATTERN = Pattern.compile("(Array.*)");
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        statement.setArray(index, (java.sql.Array) value);
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return PATTERN.matcher(fieldType).matches();
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/BigDecimalInjectFunction.java
similarity index 59%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/BigDecimalInjectFunction.java
index e31a7548..238257dc 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/BigDecimalInjectFunction.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
 
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+public class BigDecimalInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        statement.setBigDecimal(index, (java.math.BigDecimal) value);
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Decimal".equals(fieldType);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
new file mode 100644
index 00000000..f794b0c2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
+
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
+
+import java.sql.SQLException;
+
+/**
+ * Injects a field into a ClickHouse statement, used to transform a java type into a ClickHouse type.
+ */
+public interface ClickhouseFieldInjectFunction {
+
+    /**
+     * Inject the value into the statement.
+     *
+     * @param statement statement to inject into
+     * @param value     value to inject
+     * @param index     index in the statement
+     */
+    void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException;
+
+    /**
+     * If the fieldType need to be injected by the current function.
+     *
+     * @param fieldType field type to inject
+     * @return true if the fieldType need to be injected by the current function
+     */
+    boolean isCurrentFieldType(String fieldType);
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateInjectFunction.java
similarity index 54%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateInjectFunction.java
index e31a7548..233767b7 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateInjectFunction.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.Date;
+import java.sql.SQLException;
 
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+public class DateInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        if (value instanceof Date) {
+            statement.setDate(index, (Date) value);
+        } else {
+            statement.setDate(index, Date.valueOf(value.toString()));
+        }
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Date".equals(fieldType);
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateTimeInjectFunction.java
similarity index 52%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateTimeInjectFunction.java
index e31a7548..a1c6b55b 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DateTimeInjectFunction.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
+import java.sql.Timestamp;
 
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        if (value instanceof Timestamp) {
+            statement.setTimestamp(index, (Timestamp) value);
+        } else {
+            statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
+        }
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "DateTime".equals(fieldType);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DoubleInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DoubleInjectFunction.java
new file mode 100644
index 00000000..94136e40
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/DoubleInjectFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
+
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
+
+import java.math.BigDecimal;
+import java.sql.SQLException;
+
+public class DoubleInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        if (value instanceof BigDecimal) {
+            statement.setDouble(index, ((BigDecimal) value).doubleValue());
+        } else {
+            statement.setDouble(index, (Double) value);
+        }
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "UInt32".equals(fieldType)
+            || "UInt64".equals(fieldType)
+            || "Int64".equals(fieldType)
+            || "Float64".equals(fieldType);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/FloatInjectFunction.java
similarity index 53%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/FloatInjectFunction.java
index e31a7548..ed91455d 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/FloatInjectFunction.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.math.BigDecimal;
+import java.sql.SQLException;
 
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+public class FloatInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        if (value instanceof BigDecimal) {
+            statement.setFloat(index, ((BigDecimal) value).floatValue());
+        } else {
+            statement.setFloat(index, (Float) value);
+        }
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Float32".equals(fieldType);
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/IntInjectFunction.java
similarity index 54%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/IntInjectFunction.java
index e31a7548..647bc969 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/IntInjectFunction.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
 
-import java.io.IOException;
-
-public class FakeSourceToConsoleIT extends FlinkContainer {
+public class IntInjectFunction implements ClickhouseFieldInjectFunction {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        statement.setInt(index, (int) value);
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "Int8".equals(fieldType)
+            || "UInt8".equals(fieldType)
+            || "Int16".equals(fieldType)
+            || "UInt16".equals(fieldType)
+            || "Int32".equals(fieldType);
     }
+
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/LongInjectFunction.java
similarity index 57%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/LongInjectFunction.java
index e31a7548..7a2abc0b 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/LongInjectFunction.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
 
-import java.io.IOException;
+public class LongInjectFunction implements ClickhouseFieldInjectFunction {
 
-public class FakeSourceToConsoleIT extends FlinkContainer {
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        statement.setLong(index, (Long) value);
+    }
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "UInt32".equals(fieldType)
+            || "UInt64".equals(fieldType)
+            || "Int64".equals(fieldType);
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/StringInjectFunction.java
similarity index 52%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/StringInjectFunction.java
index e31a7548..9098692e 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/inject/StringInjectFunction.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.fake;
+package org.apache.seatunnel.flink.clickhouse.sink.inject;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.testcontainers.containers.Container;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
 
-import java.io.IOException;
+public class StringInjectFunction implements ClickhouseFieldInjectFunction {
 
-public class FakeSourceToConsoleIT extends FlinkContainer {
+    private static final Pattern LOW_CARDINALITY_PATTERN = Pattern.compile("LowCardinality\\((.*)\\)");
 
-    @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
-        Assert.assertEquals(0, execResult.getExitCode());
+    @Override
+    public void injectFields(ClickHousePreparedStatementImpl statement, int index, Object value) throws SQLException {
+        statement.setString(index, value.toString());
+    }
+
+    @Override
+    public boolean isCurrentFieldType(String fieldType) {
+        return "String".equals(fieldType) || LOW_CARDINALITY_PATTERN.matcher(fieldType).matches();
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
new file mode 100644
index 00000000..35104c33
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.flink.clickhouse.sink.ClickhouseBatchSink
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index 301c9ccf..d0f03bb0 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -96,6 +96,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-flink-clickhouse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-transform-flink-sql</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
index 2be116e6..ff71391f 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -37,6 +37,13 @@
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-flink-clickhouse</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 6e08bcbc..9489acd3 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -46,7 +46,7 @@ public abstract class FlinkContainer {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
 
     private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
-    public static final Network NETWORK = Network.newNetwork();
+    protected static final Network NETWORK = Network.newNetwork();
 
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
@@ -85,7 +85,7 @@ public abstract class FlinkContainer {
         Startables.deepStart(Stream.of(jobManager)).join();
         Startables.deepStart(Stream.of(taskManager)).join();
         copySeaTunnelFlinkCoreJar();
-        LOG.info("Containers are started.");
+        LOG.info("Flink containers are started.");
     }
 
     @After
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
new file mode 100644
index 00000000..9a349712
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.clickhouse;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseStatement;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+public class FakeSourceToClickhouseIT extends FlinkContainer {
+
+    private GenericContainer<?> clickhouseServer;
+    private BalancedClickhouseDataSource dataSource;
+    private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:21.3.20.1";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToClickhouseIT.class);
+
+    @Before
+    @SuppressWarnings("magicnumber")
+    public void startClickhouseContainer() throws InterruptedException {
+        clickhouseServer = new GenericContainer<>(CLICKHOUSE_DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases("clickhouse")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        clickhouseServer.setPortBindings(Lists.newArrayList("8123:8123"));
+        Startables.deepStart(Stream.of(clickhouseServer)).join();
+        LOGGER.info("Clickhouse container started");
+        // wait for clickhouse fully start
+        Thread.sleep(5000L);
+        dataSource = createDatasource();
+        initializeClickhouseTable();
+    }
+
+    /**
+     * Test insert into clickhouse table from fake source by flink batch mode.
+     *
+     * @throws IOException          read from conf file error.
+     * @throws InterruptedException execute flink job error.
+     * @throws SQLException         execute clickhouse sql error.
+     */
+    @Test
+    public void testFakeSourceToClickhouseSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/clickhouse/fakesource_to_clickhouse.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
+        // query result
+        try (ClickHouseConnection connection = dataSource.getConnection()) {
+            ClickHouseStatement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery("select * from default.seatunnel_console");
+            List<String> result = Lists.newArrayList();
+            while (resultSet.next()) {
+                result.add(resultSet.getString("name"));
+            }
+            Assert.assertFalse(result.isEmpty());
+        }
+    }
+
+    private BalancedClickhouseDataSource createDatasource() {
+        String jdbcUrl = "jdbc:clickhouse://localhost:8123/default";
+        Properties properties = new Properties();
+        properties.setProperty("user", "default");
+        properties.setProperty("password", "");
+        return new BalancedClickhouseDataSource(jdbcUrl, properties);
+    }
+
+    private void initializeClickhouseTable() {
+        try (ClickHouseConnection connection = dataSource.getConnection()) {
+            ClickHouseStatement statement = connection.createStatement();
+            String initializeTableSql = "CREATE TABLE default.seatunnel_console" +
+                "(" +
+                "    `name` Nullable(String)" +
+                ")ENGINE = Memory";
+            statement.execute(initializeTableSql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing clickhouse table failed", e);
+        }
+    }
+
+    @After
+    public void closeClickhouseContainer() {
+        if (clickhouseServer != null) {
+            clickhouseServer.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
index e31a7548..17f5324b 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
@@ -28,7 +28,7 @@ import java.io.IOException;
 public class FakeSourceToConsoleIT extends FlinkContainer {
 
     @Test
-    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
+    public void testFakeSourceToConsoleSink() throws IOException, InterruptedException {
         Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
         Assert.assertEquals(0, execResult.getExitCode());
     }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/clickhouse/fakesource_to_clickhouse.conf b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/clickhouse/fakesource_to_clickhouse.conf
new file mode 100644
index 00000000..e227d529
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/clickhouse/fakesource_to_clickhouse.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "seatunnel_console"
+    fields = ["name"]
+    username = "default"
+    password = ""
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file