You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/17 13:43:18 UTC

[incubator-seatunnel] branch dev updated: [Feature][connector] Add Clickhouse file sink on Flink engine (#1700)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0e731052 [Feature][connector] Add Clickhouse file sink on Flink engine (#1700)
0e731052 is described below

commit 0e731052da444451c59893734e68559c415a7924
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun Apr 17 21:43:11 2022 +0800

    [Feature][connector] Add Clickhouse file sink on Flink engine (#1700)
    
    * [Feature][connector] Add clickhouse file sink on flink engine
    
    * Remove return value of sink
    
    * shuffle by shardkey to reduce file num
    
    * fix code style
---
 docs/en/connector/sink/ClickhouseFile.md           |   6 +-
 .../common/config/TypesafeConfigUtils.java         |   3 +
 ...ntHolder.java => ClickhouseFileCopyMethod.java} |  24 +-
 .../seatunnel/flink/clickhouse/pojo/IntHolder.java |   7 +-
 .../clickhouse/sink/ClickhouseFileBatchSink.java   | 188 +++++++++++++++
 .../sink/ClickhouseFileOutputFormat.java           | 262 +++++++++++++++++++++
 .../clickhouse/sink/client/ClickhouseClient.java   |  74 ++++++
 .../flink/clickhouse/sink/client/ShardRouter.java  |   5 +-
 .../clickhouse/sink/file/ClickhouseTable.java      | 117 +++++++++
 .../IntHolder.java => sink/file/FileTransfer.java} |  19 +-
 .../clickhouse/sink/file/ScpFileTransfer.java      | 115 +++++++++
 .../org.apache.seatunnel.flink.BaseFlinkSink       |   1 +
 .../flink/clickhouse/FakeSourceToClickhouseIT.java |   2 +-
 13 files changed, 802 insertions(+), 21 deletions(-)

diff --git a/docs/en/connector/sink/ClickhouseFile.md b/docs/en/connector/sink/ClickhouseFile.md
index 6fa70176..a4d40614 100644
--- a/docs/en/connector/sink/ClickhouseFile.md
+++ b/docs/en/connector/sink/ClickhouseFile.md
@@ -10,7 +10,7 @@ server, also call bulk load.
 Engine Supported and plugin name
 
 * [x] Spark: ClickhouseFile
-* [ ] Flink
+* [x] Flink
 
 :::
 
@@ -126,7 +126,7 @@ Sink plugin common parameters, please refer to [common options](common-options.m
 ## Examples
 
 ```bash
-clickhouse {
+ClickhouseFile {
     host = "localhost:8123"
     database = "nginx"
     table = "access_msg"
@@ -139,7 +139,7 @@ clickhouse {
 ```
 
 ```bash
-ClickHouse {
+ClickhouseFile {
     host = "localhost:8123"
     database = "nginx"
     table = "access_msg"
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index b172d3b9..777244b1 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -105,6 +105,9 @@ public final class TypesafeConfigUtils {
         if (defaultValue.getClass().equals(String.class)) {
             return config.hasPath(configKey) ? (T) config.getString(configKey) : defaultValue;
         }
+        if (defaultValue.getClass().equals(Boolean.class)) {
+            return config.hasPath(configKey) ? (T) Boolean.valueOf(config.getString(configKey)) : defaultValue;
+        }
         throw new RuntimeException("Unsupported config type, configKey: " + configKey);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
index 68e2178a..dfc7e66d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
@@ -17,14 +17,26 @@
 
 package org.apache.seatunnel.flink.clickhouse.pojo;
 
-public class IntHolder {
-    private int value;
+public enum ClickhouseFileCopyMethod {
+    SCP("scp"),
+    RSYNC("rsync"),
+    ;
+    private final String name;
 
-    public int getValue() {
-        return value;
+    ClickhouseFileCopyMethod(String name) {
+        this.name = name;
     }
 
-    public void setValue(int value) {
-        this.value = value;
+    public String getName() {
+        return name;
+    }
+
+    public static ClickhouseFileCopyMethod from(String name) {
+        for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) {
+            if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) {
+                return clickhouseFileCopyMethod;
+            }
+        }
+        throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
index 68e2178a..aa6e97b9 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
@@ -17,7 +17,12 @@
 
 package org.apache.seatunnel.flink.clickhouse.pojo;
 
-public class IntHolder {
+import java.io.Serializable;
+
+public class IntHolder implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
     private int value;
 
     public int getValue() {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
new file mode 100644
index 00000000..573f3ec8
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.COPY_METHOD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.FIELDS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.HOST;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.SHARDING_KEY;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.TABLE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import ru.yandex.clickhouse.ClickHouseConnection;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ClickhouseFileBatchSink extends ClickhouseBatchSink {
+
+    private Config config;
+    private ShardMetadata shardMetadata;
+    private Map<String, String> tableSchema = new HashMap<>();
+    private List<String> fields;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
+        Map<String, Object> defaultConfigs = ImmutableMap.<String, Object>builder()
+            .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+            .build();
+
+        config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment env) {
+        ClickhouseClient clickhouseClient = new ClickhouseClient(config);
+        String table = config.getString(TABLE);
+        String database = config.getString(DATABASE);
+        String[] hostAndPort = config.getString(HOST).split(":");
+        try (ClickHouseConnection connection = clickhouseClient.getClickhouseConnection()) {
+            tableSchema = clickhouseClient.getClickhouseTableSchema(connection, table);
+            String shardKey = TypesafeConfigUtils.getConfig(this.config, SHARDING_KEY, "");
+            String shardKeyType = tableSchema.get(shardKey);
+            shardMetadata = new ShardMetadata(
+                shardKey,
+                shardKeyType,
+                database,
+                table,
+                false, // we don't need to set splitMode in clickhouse file mode.
+                new Shard(1, 1, 1, hostAndPort[0], hostAndPort[0], hostAndPort[1], database));
+
+            if (this.config.hasPath(FIELDS)) {
+                fields = this.config.getStringList(FIELDS);
+                // check if the fields exist in schema
+                for (String field : fields) {
+                    if (!tableSchema.containsKey(field)) {
+                        throw new RuntimeException("Field " + field + " does not exist in table " + table);
+                    }
+                }
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to connect to clickhouse server", e);
+        }
+    }
+
+    @Nullable
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+        RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
+        String[] fieldNames = rowTypeInfo.getFieldNames();
+        final IntHolder shardKeyIndexHolder = new IntHolder();
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (fieldNames[i].equals(shardMetadata.getShardKey())) {
+                shardKeyIndexHolder.setValue(i);
+                break;
+            }
+        }
+        final MapPartitionOperator<Row, Row> mapPartitionOperator = dataSet.partitionCustom(new Partitioner<String>() {
+            // make sure the data belongs to each shard shuffle to the same partition
+            @Override
+            public int partition(String shardKey, int numPartitions) {
+                return shardKey.hashCode() % numPartitions;
+            }
+        }, new KeySelector<Row, String>() {
+            @Override
+            public String getKey(Row value) {
+                int shardKeyIndex = shardKeyIndexHolder.getValue();
+                return Objects.toString(value.getField(shardKeyIndex));
+            }
+        }).mapPartition(new MapPartitionFunction<Row, Row>() {
+            @Override
+            public void mapPartition(Iterable<Row> values, Collector<Row> out) throws Exception {
+                new ClickhouseFileOutputFormat(config, shardMetadata, fields).writeRecords(values);
+            }
+        });
+
+        // This is just a dummy sink, since each flink job need to have a sink.
+        mapPartitionOperator.output(new OutputFormat<Row>() {
+
+            @Override
+            public void configure(Configuration parameters) {
+            }
+
+            @Override
+            public void open(int taskNumber, int numTasks) {
+            }
+
+            @Override
+            public void writeRecord(Row record) {
+            }
+
+            @Override
+            public void close() {
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        super.close();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "ClickhouseFile";
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
new file mode 100644
index 00000000..e5b293c2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.COPY_METHOD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_ADDRESS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_FREE_PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_PASS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.TABLE;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ShardRouter;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
+import org.apache.seatunnel.flink.clickhouse.sink.file.FileTransfer;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ScpFileTransfer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseConnectionImpl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ClickhouseFileOutputFormat {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileOutputFormat.class);
+    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/flink-file";
+    private static final int UUID_LENGTH = 10;
+
+    private final Config config;
+    private final String clickhouseLocalPath;
+    private final List<String> fields;
+    private final ShardMetadata shardMetadata;
+    private final ClickhouseFileCopyMethod clickhouseFileCopyMethod;
+    private final Map<String, String> nodePassword;
+
+    private final ClickhouseClient clickhouseClient;
+    private final ShardRouter shardRouter;
+    private final ClickhouseTable clickhouseTable;
+    private final Map<String, String> schemaMap;
+    private final Map<Shard, List<String>> shardLocalDataPaths;
+
+    // In most of the case, the data has been already shuffled in ClickhouseFileBatchSink#outputBatch
+    private final Map<Shard, List<Row>> rowCache;
+
+    public ClickhouseFileOutputFormat(Config config, ShardMetadata shardMetadata, List<String> fields) throws IOException {
+        this.config = config;
+        this.clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH);
+        this.shardMetadata = shardMetadata;
+        this.fields = fields;
+        this.clickhouseFileCopyMethod = ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD));
+        if (TypesafeConfigUtils.getConfig(config, NODE_FREE_PASSWORD, true)) {
+            this.nodePassword = Collections.emptyMap();
+        } else {
+            nodePassword = config.getObjectList(NODE_PASS).stream()
+                .collect(Collectors.toMap(
+                    configObject -> configObject.toConfig().getString(NODE_ADDRESS),
+                    configObject -> configObject.toConfig().getString(PASSWORD)));
+        }
+        clickhouseClient = new ClickhouseClient(config);
+        shardRouter = new ShardRouter(clickhouseClient, shardMetadata);
+        clickhouseTable = clickhouseClient.getClickhouseTable(config.getString(DATABASE), config.getString(TABLE));
+        schemaMap = clickhouseClient.getClickhouseTableSchema(config.getString(TABLE));
+
+        rowCache = new HashMap<>(shardRouter.getShards().keySet().size());
+        if (!TypesafeConfigUtils.getConfig(config, NODE_FREE_PASSWORD, true)) {
+            shardRouter.getShards().values().forEach(shard -> {
+                if (!nodePassword.containsKey(shard.getHostAddress()) && !nodePassword.containsKey(shard.getHostname())) {
+                    throw new RuntimeException("Cannot find password of shard " + shard.getHostAddress());
+                }
+            });
+        }
+        shardLocalDataPaths = shardRouter.getShards().values().stream()
+            .collect(Collectors.toMap(
+                Function.identity(),
+                shard -> {
+                    ClickhouseTable shardTable = clickhouseClient
+                        .getClickhouseTable(shard.getDatabase(), clickhouseTable.getLocalTableName());
+                    return shardTable.getDataPaths();
+                }));
+    }
+
+    public void writeRecords(Iterable<Row> records) {
+        for (Row record : records) {
+            Shard shard = shardRouter.getShard(record);
+            rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(record);
+        }
+        for (Map.Entry<Shard, List<Row>> entry : rowCache.entrySet()) {
+            Shard shard = entry.getKey();
+            List<Row> rows = entry.getValue();
+            flush(shard, rows);
+            rows.clear();
+        }
+    }
+
+    private void flush(Shard shard, List<Row> rows) {
+        try {
+            // generate clickhouse local file
+            List<String> clickhouseLocalFiles = generateClickhouseLocalFiles(shard, rows);
+            // move file to server
+            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+            // clear local file
+            clearLocalFileDirectory(clickhouseLocalFiles);
+        } catch (Exception e) {
+            throw new RuntimeException("Flush data into clickhouse file error", e);
+        }
+    }
+
+    private List<String> generateClickhouseLocalFiles(Shard shard, List<Row> rows) throws IOException, InterruptedException {
+        if (CollectionUtils.isEmpty(rows)) {
+            return Collections.emptyList();
+        }
+        String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_");
+        String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
+        FileUtils.forceMkdir(new File(clickhouseLocalFile));
+        String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log";
+        FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE,
+            StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+        String data = rows.stream()
+            .map(row -> fields.stream().map(field -> row.getField(field).toString())
+                .collect(Collectors.joining("\t")))
+            .collect(Collectors.joining("\n"));
+        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+            data.getBytes(StandardCharsets.UTF_8).length);
+        buffer.put(data.getBytes(StandardCharsets.UTF_8));
+
+        List<String> command = new ArrayList<>();
+        command.add("cat");
+        command.add(clickhouseLocalFileTmpFile);
+        command.add("|");
+
+        command.addAll(Arrays.stream(clickhouseLocalPath.trim().split(" ")).collect(Collectors.toList()));
+        command.add("local");
+        command.add("-S");
+        command.add("\"" + fields.stream().map(field -> field + " " + schemaMap.get(field)).collect(Collectors.joining(",")) + "\"");
+        command.add("-N");
+        command.add("\"" + "temp_table" + uuid + "\"");
+        command.add("-q");
+        command.add(String.format(
+            "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+            clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""),
+            clickhouseTable.getLocalTableName(),
+            schemaMap.entrySet().stream().map(entry -> {
+                if (fields.contains(entry.getKey())) {
+                    return entry.getKey();
+                } else {
+                    return "NULL";
+                }
+            }).collect(Collectors.joining(",")),
+            uuid));
+        command.add("--path");
+        command.add("\"" + clickhouseLocalFile + "\"");
+        LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+        ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
+        Process start = processBuilder.start();
+        // we just wait for the process to finish
+        try (InputStream inputStream = start.getInputStream();
+             InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+             BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                LOGGER.info(line);
+            }
+        }
+        start.waitFor();
+        File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName());
+        if (!file.exists()) {
+            throw new RuntimeException("clickhouse local file not exists");
+        }
+        File[] files = file.listFiles();
+        if (files == null) {
+            throw new RuntimeException("clickhouse local file not exists");
+        }
+        return Arrays.stream(files)
+            .filter(File::isDirectory)
+            .filter(f -> !"detached".equals(f.getName()))
+            .map(File::getAbsolutePath).collect(Collectors.toList());
+    }
+
+    private void attachClickhouseLocalFileToServer(Shard shard, List<String> clickhouseLocalFiles) {
+        if (ClickhouseFileCopyMethod.SCP.equals(clickhouseFileCopyMethod)) {
+            String hostAddress = shard.getHostAddress();
+            String password = nodePassword.getOrDefault(hostAddress, null);
+            FileTransfer fileTransfer = new ScpFileTransfer(hostAddress, password);
+            fileTransfer.init();
+            fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/");
+            fileTransfer.close();
+        } else {
+            throw new RuntimeException("unsupported clickhouse file copy method " + clickhouseFileCopyMethod);
+        }
+
+        try (ClickHouseConnectionImpl clickhouseConnection = clickhouseClient.getClickhouseConnection(shard)) {
+            for (String clickhouseLocalFile : clickhouseLocalFiles) {
+                clickhouseConnection.createStatement()
+                    .execute(String.format("ALTER TABLE %s ATTACH PART '%s'",
+                        clickhouseTable.getLocalTableName(),
+                        clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1)));
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Unable to close connection", e);
+        }
+    }
+
+    private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
+        String clickhouseLocalFile = clickhouseLocalFiles.get(0);
+        String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+        try {
+            File file = new File(localFileDir);
+            if (file.exists()) {
+                FileUtils.deleteDirectory(new File(localFileDir));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to delete directory " + localFileDir, e);
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index c14b99c6..2e6d121d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -26,13 +26,17 @@ import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
 import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
 import ru.yandex.clickhouse.BalancedClickhouseDataSource;
 import ru.yandex.clickhouse.ClickHouseConnection;
 import ru.yandex.clickhouse.ClickHouseConnectionImpl;
 import ru.yandex.clickhouse.ClickHouseStatement;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -42,6 +46,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 @SuppressWarnings("magicnumber")
@@ -49,6 +54,8 @@ public class ClickhouseClient {
 
     private final BalancedClickhouseDataSource balancedClickhouseDataSource;
 
+    private Map<Shard, BalancedClickhouseDataSource> shardToDataSource = new ConcurrentHashMap<>(16);
+
     public ClickhouseClient(Config config) {
         Properties clickhouseProperties = new Properties();
         if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
@@ -70,6 +77,18 @@ public class ClickhouseClient {
         }
     }
 
+    public ClickHouseConnectionImpl getClickhouseConnection(Shard shard) {
+        BalancedClickhouseDataSource shardDatasource = shardToDataSource.computeIfAbsent(shard, s -> {
+            ClickHouseProperties properties = this.balancedClickhouseDataSource.getProperties();
+            return new BalancedClickhouseDataSource(s.getJdbcUrl(), properties);
+        });
+        try {
+            return (ClickHouseConnectionImpl) shardDatasource.getConnection();
+        } catch (SQLException e) {
+            throw new RuntimeException("Connot connect to target shard + " + shard.getJdbcUrl(), e);
+        }
+    }
+
     public DistributedEngine getClickhouseDistributedTable(String database, String table) {
         try (ClickHouseConnection connection = getClickhouseConnection()) {
             return getClickhouseDistributedTable(connection, database, table);
@@ -95,6 +114,12 @@ public class ClickhouseClient {
         }
     }
 
+    /**
+     * Get ClickHouse table schema, the key is fileName, value is value type.
+     *
+     * @param table table name.
+     * @return schema map.
+     */
     public Map<String, String> getClickhouseTableSchema(String table) {
         try (ClickHouseConnection connection = getClickhouseConnection()) {
             return getClickhouseTableSchema(connection, table);
@@ -117,6 +142,15 @@ public class ClickhouseClient {
         return schema;
     }
 
+    /**
+     * Get the shard of the given cluster.
+     *
+     * @param connection  clickhouse connection.
+     * @param clusterName cluster name.
+     * @param database    database of the shard.
+     * @param port        port of the shard.
+     * @return shard list.
+     */
     public List<Shard> getClusterShardList(ClickHouseConnection connection, String clusterName, String database, String port) {
         String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
         List<Shard> shardList = new ArrayList<>();
@@ -138,4 +172,44 @@ public class ClickhouseClient {
         }
     }
 
+    /**
+     * Get ClickHouse table info.
+     *
+     * @param database database of the table.
+     * @param table    table name of the table.
+     * @return clickhouse table info.
+     */
+    public ClickhouseTable getClickhouseTable(String database, String table) {
+        try (ClickHouseConnection connection = balancedClickhouseDataSource.getConnection();
+             ClickHouseStatement statement = connection.createStatement()) {
+            String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table);
+            ResultSet resultSet = statement.executeQuery(sql);
+            if (!resultSet.next()) {
+                throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+            }
+
+            String engine = resultSet.getString(1);
+            String createTableDDL = resultSet.getString(2);
+            String engineFull = resultSet.getString(3);
+            List<String> dataPaths = JSON.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
+            });
+            DistributedEngine distributedEngine = null;
+            if ("Distributed".equals(engine)) {
+                distributedEngine = getClickhouseDistributedTable(connection, database, table);
+            }
+            return new ClickhouseTable(
+                database,
+                table,
+                distributedEngine,
+                engine,
+                createTableDDL,
+                engineFull,
+                dataPaths,
+                getClickhouseTableSchema(connection, table));
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot get clickhouse table", e);
+        }
+
+    }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
index e6ad4438..cfbeb13c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.types.Row;
 import ru.yandex.clickhouse.ClickHouseConnection;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
@@ -34,7 +35,9 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 
-public class ShardRouter {
+public class ShardRouter implements Serializable {
+
+    private static final long serialVersionUID = -1L;
 
     private String shardTable;
     private final String table;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java
new file mode 100644
index 00000000..fcbccfd6
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.file;
+
+import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseTable {
+
+    private String database;
+    private String tableName;
+    private String engine;
+    private String engineFull;
+    private String createTableDDL;
+    private List<String> dataPaths;
+    private DistributedEngine distributedEngine;
+    private Map<String, String> tableSchema;
+
+    public ClickhouseTable(String database,
+                           String tableName,
+                           DistributedEngine distributedEngine,
+                           String engine,
+                           String createTableDDL,
+                           String engineFull,
+                           List<String> dataPaths,
+                           Map<String, String> tableSchema) {
+        this.database = database;
+        this.tableName = tableName;
+        this.distributedEngine = distributedEngine;
+        this.engine = engine;
+        this.engineFull = engineFull;
+        this.createTableDDL = createTableDDL;
+        this.dataPaths = dataPaths;
+        this.tableSchema = tableSchema;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getEngine() {
+        return engine;
+    }
+
+    public void setEngine(String engine) {
+        this.engine = engine;
+    }
+
+    public String getEngineFull() {
+        return engineFull;
+    }
+
+    public void setEngineFull(String engineFull) {
+        this.engineFull = engineFull;
+    }
+
+    public String getCreateTableDDL() {
+        return createTableDDL;
+    }
+
+    public void setCreateTableDDL(String createTableDDL) {
+        this.createTableDDL = createTableDDL;
+    }
+
+    public List<String> getDataPaths() {
+        return dataPaths;
+    }
+
+    public void setDataPaths(List<String> dataPaths) {
+        this.dataPaths = dataPaths;
+    }
+
+    public Map<String, String> getTableSchema() {
+        return tableSchema;
+    }
+
+    public void setTableSchema(Map<String, String> tableSchema) {
+        this.tableSchema = tableSchema;
+    }
+
+    public String getLocalTableName() {
+        if (distributedEngine != null) {
+            return distributedEngine.getTable();
+        } else {
+            return tableName;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
similarity index 73%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
index 68e2178a..e84d2393 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.flink.clickhouse.pojo;
+package org.apache.seatunnel.flink.clickhouse.sink.file;
 
-public class IntHolder {
-    private int value;
+import java.util.List;
 
-    public int getValue() {
-        return value;
-    }
+public interface FileTransfer {
 
-    public void setValue(int value) {
-        this.value = value;
-    }
+    void init();
+
+    void transferAndChown(String sourcePath, String targetPath);
+
+    void transferAndChown(List<String> sourcePath, String targetPath);
+
+    void close();
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java
new file mode 100644
index 00000000..0b1c58cc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.file;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.scp.client.ScpClient;
+import org.apache.sshd.scp.client.ScpClientCreator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScpFileTransfer implements FileTransfer {
+
+    private static final int SCP_PORT = 22;
+
+    private final String host;
+    private final String password;
+
+    private ScpClient scpClient;
+    private ClientSession clientSession;
+    private SshClient sshClient;
+
+    public ScpFileTransfer(String host, String password) {
+        this.host = host;
+        this.password = password;
+    }
+
+    @Override
+    public void init() {
+        try {
+            sshClient = SshClient.setUpDefaultClient();
+            sshClient.start();
+            clientSession = sshClient.connect("root", host, SCP_PORT).verify().getSession();
+            if (password != null) {
+                clientSession.addPasswordIdentity(password);
+            }
+            if (!clientSession.auth().verify().isSuccess()) {
+                throw new IOException("ssh host " + host + "authentication failed");
+            }
+            scpClient = ScpClientCreator.instance().createScpClient(clientSession);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to connect to host: " + host + " by user: root on port 22", e);
+        }
+    }
+
+    @Override
+    public void transferAndChown(String sourcePath, String targetPath) {
+        try {
+            scpClient.upload(
+                sourcePath,
+                targetPath,
+                ScpClient.Option.Recursive,
+                ScpClient.Option.TargetIsDirectory,
+                ScpClient.Option.PreserveAttributes);
+        } catch (IOException e) {
+            throw new RuntimeException("Scp failed to transfer file: " + sourcePath + " to: " + targetPath, e);
+        }
+        // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+        // make ATTACH command work.
+        List<String> command = new ArrayList<>();
+        command.add("ls");
+        command.add("-l");
+        command.add(targetPath.substring(0, targetPath.lastIndexOf("/")));
+        command.add("/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
+        try {
+            clientSession.executeRemoteCommand(String.join(" ", command));
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to execute remote command: " + command, e);
+        }
+    }
+
+    @Override
+    public void transferAndChown(List<String> sourcePaths, String targetPath) {
+        if (sourcePaths == null) {
+            throw new IllegalArgumentException("sourcePath is null");
+        }
+        sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, targetPath));
+    }
+
+    @Override
+    public void close() {
+        if (clientSession != null && clientSession.isOpen()) {
+            try {
+                clientSession.close();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to close ssh session", e);
+            }
+        }
+        if (sshClient != null && sshClient.isOpen()) {
+            sshClient.stop();
+            try {
+                sshClient.close();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to close ssh client", e);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
index 35104c33..acd3d6a5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
@@ -16,3 +16,4 @@
 #
 
 org.apache.seatunnel.flink.clickhouse.sink.ClickhouseBatchSink
+org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
index 9a349712..adbea780 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -45,7 +45,7 @@ public class FakeSourceToClickhouseIT extends FlinkContainer {
 
     private GenericContainer<?> clickhouseServer;
     private BalancedClickhouseDataSource dataSource;
-    private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:21.3.20.1";
+    private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:22.1.3.7";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToClickhouseIT.class);