You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/17 13:43:18 UTC
[incubator-seatunnel] branch dev updated: [Feature][connector] Add Clickhouse file sink on Flink engine (#1700)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e731052 [Feature][connector] Add Clickhouse file sink on Flink engine (#1700)
0e731052 is described below
commit 0e731052da444451c59893734e68559c415a7924
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun Apr 17 21:43:11 2022 +0800
[Feature][connector] Add Clickhouse file sink on Flink engine (#1700)
* [Feature][connector] Add clickhouse file sink on flink engine
* Remove return value of sink
* shuffle by shardkey to reduce file num
* fix code style
---
docs/en/connector/sink/ClickhouseFile.md | 6 +-
.../common/config/TypesafeConfigUtils.java | 3 +
...ntHolder.java => ClickhouseFileCopyMethod.java} | 24 +-
.../seatunnel/flink/clickhouse/pojo/IntHolder.java | 7 +-
.../clickhouse/sink/ClickhouseFileBatchSink.java | 188 +++++++++++++++
.../sink/ClickhouseFileOutputFormat.java | 262 +++++++++++++++++++++
.../clickhouse/sink/client/ClickhouseClient.java | 74 ++++++
.../flink/clickhouse/sink/client/ShardRouter.java | 5 +-
.../clickhouse/sink/file/ClickhouseTable.java | 117 +++++++++
.../IntHolder.java => sink/file/FileTransfer.java} | 19 +-
.../clickhouse/sink/file/ScpFileTransfer.java | 115 +++++++++
.../org.apache.seatunnel.flink.BaseFlinkSink | 1 +
.../flink/clickhouse/FakeSourceToClickhouseIT.java | 2 +-
13 files changed, 802 insertions(+), 21 deletions(-)
diff --git a/docs/en/connector/sink/ClickhouseFile.md b/docs/en/connector/sink/ClickhouseFile.md
index 6fa70176..a4d40614 100644
--- a/docs/en/connector/sink/ClickhouseFile.md
+++ b/docs/en/connector/sink/ClickhouseFile.md
@@ -10,7 +10,7 @@ server, also call bulk load.
Engine Supported and plugin name
* [x] Spark: ClickhouseFile
-* [ ] Flink
+* [x] Flink
:::
@@ -126,7 +126,7 @@ Sink plugin common parameters, please refer to [common options](common-options.m
## Examples
```bash
-clickhouse {
+ClickhouseFile {
host = "localhost:8123"
database = "nginx"
table = "access_msg"
@@ -139,7 +139,7 @@ clickhouse {
```
```bash
-ClickHouse {
+ClickhouseFile {
host = "localhost:8123"
database = "nginx"
table = "access_msg"
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index b172d3b9..777244b1 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -105,6 +105,9 @@ public final class TypesafeConfigUtils {
if (defaultValue.getClass().equals(String.class)) {
return config.hasPath(configKey) ? (T) config.getString(configKey) : defaultValue;
}
+ if (defaultValue.getClass().equals(Boolean.class)) {
+ return config.hasPath(configKey) ? (T) Boolean.valueOf(config.getString(configKey)) : defaultValue;
+ }
throw new RuntimeException("Unsupported config type, configKey: " + configKey);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
index 68e2178a..dfc7e66d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/ClickhouseFileCopyMethod.java
@@ -17,14 +17,26 @@
package org.apache.seatunnel.flink.clickhouse.pojo;
-public class IntHolder {
- private int value;
+public enum ClickhouseFileCopyMethod {
+ SCP("scp"),
+ RSYNC("rsync"),
+ ;
+ private final String name;
- public int getValue() {
- return value;
+ ClickhouseFileCopyMethod(String name) {
+ this.name = name;
}
- public void setValue(int value) {
- this.value = value;
+ public String getName() {
+ return name;
+ }
+
+ public static ClickhouseFileCopyMethod from(String name) {
+ for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) {
+ if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) {
+ return clickhouseFileCopyMethod;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
index 68e2178a..aa6e97b9 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
@@ -17,7 +17,12 @@
package org.apache.seatunnel.flink.clickhouse.pojo;
-public class IntHolder {
+import java.io.Serializable;
+
+public class IntHolder implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
private int value;
public int getValue() {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
new file mode 100644
index 00000000..573f3ec8
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.COPY_METHOD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.FIELDS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.HOST;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.SHARDING_KEY;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.TABLE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import ru.yandex.clickhouse.ClickHouseConnection;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ClickhouseFileBatchSink extends ClickhouseBatchSink {
+
+ private Config config;
+ private ShardMetadata shardMetadata;
+ private Map<String, String> tableSchema = new HashMap<>();
+ private List<String> fields;
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
+ Map<String, Object> defaultConfigs = ImmutableMap.<String, Object>builder()
+ .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+ .build();
+
+ config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(FlinkEnvironment env) {
+ ClickhouseClient clickhouseClient = new ClickhouseClient(config);
+ String table = config.getString(TABLE);
+ String database = config.getString(DATABASE);
+ String[] hostAndPort = config.getString(HOST).split(":");
+ try (ClickHouseConnection connection = clickhouseClient.getClickhouseConnection()) {
+ tableSchema = clickhouseClient.getClickhouseTableSchema(connection, table);
+ String shardKey = TypesafeConfigUtils.getConfig(this.config, SHARDING_KEY, "");
+ String shardKeyType = tableSchema.get(shardKey);
+ shardMetadata = new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ database,
+ table,
+ false, // we don't need to set splitMode in clickhouse file mode.
+ new Shard(1, 1, 1, hostAndPort[0], hostAndPort[0], hostAndPort[1], database));
+
+ if (this.config.hasPath(FIELDS)) {
+ fields = this.config.getStringList(FIELDS);
+ // check if the fields exist in schema
+ for (String field : fields) {
+ if (!tableSchema.containsKey(field)) {
+ throw new RuntimeException("Field " + field + " does not exist in table " + table);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to connect to clickhouse server", e);
+ }
+ }
+
+ @Nullable
+ @Override
+ public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+ RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
+ String[] fieldNames = rowTypeInfo.getFieldNames();
+ final IntHolder shardKeyIndexHolder = new IntHolder();
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(shardMetadata.getShardKey())) {
+ shardKeyIndexHolder.setValue(i);
+ break;
+ }
+ }
+ final MapPartitionOperator<Row, Row> mapPartitionOperator = dataSet.partitionCustom(new Partitioner<String>() {
+ // make sure the data belongs to each shard shuffle to the same partition
+ @Override
+ public int partition(String shardKey, int numPartitions) {
+ return shardKey.hashCode() % numPartitions;
+ }
+ }, new KeySelector<Row, String>() {
+ @Override
+ public String getKey(Row value) {
+ int shardKeyIndex = shardKeyIndexHolder.getValue();
+ return Objects.toString(value.getField(shardKeyIndex));
+ }
+ }).mapPartition(new MapPartitionFunction<Row, Row>() {
+ @Override
+ public void mapPartition(Iterable<Row> values, Collector<Row> out) throws Exception {
+ new ClickhouseFileOutputFormat(config, shardMetadata, fields).writeRecords(values);
+ }
+ });
+
+ // This is just a dummy sink, since each flink job need to have a sink.
+ mapPartitionOperator.output(new OutputFormat<Row>() {
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) {
+ }
+
+ @Override
+ public void writeRecord(Row record) {
+ }
+
+ @Override
+ public void close() {
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+
+ @Override
+ public String getPluginName() {
+ return "ClickhouseFile";
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
new file mode 100644
index 00000000..e5b293c2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.COPY_METHOD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.DATABASE;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_ADDRESS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_FREE_PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.NODE_PASS;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
+import static org.apache.seatunnel.flink.clickhouse.ConfigKey.TABLE;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+import org.apache.seatunnel.flink.clickhouse.sink.client.ShardRouter;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
+import org.apache.seatunnel.flink.clickhouse.sink.file.FileTransfer;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ScpFileTransfer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseConnectionImpl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ClickhouseFileOutputFormat {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileOutputFormat.class);
+ private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/flink-file";
+ private static final int UUID_LENGTH = 10;
+
+ private final Config config;
+ private final String clickhouseLocalPath;
+ private final List<String> fields;
+ private final ShardMetadata shardMetadata;
+ private final ClickhouseFileCopyMethod clickhouseFileCopyMethod;
+ private final Map<String, String> nodePassword;
+
+ private final ClickhouseClient clickhouseClient;
+ private final ShardRouter shardRouter;
+ private final ClickhouseTable clickhouseTable;
+ private final Map<String, String> schemaMap;
+ private final Map<Shard, List<String>> shardLocalDataPaths;
+
+ // In most of the case, the data has been already shuffled in ClickhouseFileBatchSink#outputBatch
+ private final Map<Shard, List<Row>> rowCache;
+
+ public ClickhouseFileOutputFormat(Config config, ShardMetadata shardMetadata, List<String> fields) throws IOException {
+ this.config = config;
+ this.clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH);
+ this.shardMetadata = shardMetadata;
+ this.fields = fields;
+ this.clickhouseFileCopyMethod = ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD));
+ if (TypesafeConfigUtils.getConfig(config, NODE_FREE_PASSWORD, true)) {
+ this.nodePassword = Collections.emptyMap();
+ } else {
+ nodePassword = config.getObjectList(NODE_PASS).stream()
+ .collect(Collectors.toMap(
+ configObject -> configObject.toConfig().getString(NODE_ADDRESS),
+ configObject -> configObject.toConfig().getString(PASSWORD)));
+ }
+ clickhouseClient = new ClickhouseClient(config);
+ shardRouter = new ShardRouter(clickhouseClient, shardMetadata);
+ clickhouseTable = clickhouseClient.getClickhouseTable(config.getString(DATABASE), config.getString(TABLE));
+ schemaMap = clickhouseClient.getClickhouseTableSchema(config.getString(TABLE));
+
+ rowCache = new HashMap<>(shardRouter.getShards().keySet().size());
+ if (!TypesafeConfigUtils.getConfig(config, NODE_FREE_PASSWORD, true)) {
+ shardRouter.getShards().values().forEach(shard -> {
+ if (!nodePassword.containsKey(shard.getHostAddress()) && !nodePassword.containsKey(shard.getHostname())) {
+ throw new RuntimeException("Cannot find password of shard " + shard.getHostAddress());
+ }
+ });
+ }
+ shardLocalDataPaths = shardRouter.getShards().values().stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ shard -> {
+ ClickhouseTable shardTable = clickhouseClient
+ .getClickhouseTable(shard.getDatabase(), clickhouseTable.getLocalTableName());
+ return shardTable.getDataPaths();
+ }));
+ }
+
+ public void writeRecords(Iterable<Row> records) {
+ for (Row record : records) {
+ Shard shard = shardRouter.getShard(record);
+ rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(record);
+ }
+ for (Map.Entry<Shard, List<Row>> entry : rowCache.entrySet()) {
+ Shard shard = entry.getKey();
+ List<Row> rows = entry.getValue();
+ flush(shard, rows);
+ rows.clear();
+ }
+ }
+
+ private void flush(Shard shard, List<Row> rows) {
+ try {
+ // generate clickhouse local file
+ List<String> clickhouseLocalFiles = generateClickhouseLocalFiles(shard, rows);
+ // move file to server
+ attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new RuntimeException("Flush data into clickhouse file error", e);
+ }
+ }
+
+ private List<String> generateClickhouseLocalFiles(Shard shard, List<Row> rows) throws IOException, InterruptedException {
+ if (CollectionUtils.isEmpty(rows)) {
+ return Collections.emptyList();
+ }
+ String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_");
+ String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
+ FileUtils.forceMkdir(new File(clickhouseLocalFile));
+ String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log";
+ FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE,
+ StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+ String data = rows.stream()
+ .map(row -> fields.stream().map(field -> row.getField(field).toString())
+ .collect(Collectors.joining("\t")))
+ .collect(Collectors.joining("\n"));
+ MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+ data.getBytes(StandardCharsets.UTF_8).length);
+ buffer.put(data.getBytes(StandardCharsets.UTF_8));
+
+ List<String> command = new ArrayList<>();
+ command.add("cat");
+ command.add(clickhouseLocalFileTmpFile);
+ command.add("|");
+
+ command.addAll(Arrays.stream(clickhouseLocalPath.trim().split(" ")).collect(Collectors.toList()));
+ command.add("local");
+ command.add("-S");
+ command.add("\"" + fields.stream().map(field -> field + " " + schemaMap.get(field)).collect(Collectors.joining(",")) + "\"");
+ command.add("-N");
+ command.add("\"" + "temp_table" + uuid + "\"");
+ command.add("-q");
+ command.add(String.format(
+ "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+ clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""),
+ clickhouseTable.getLocalTableName(),
+ schemaMap.entrySet().stream().map(entry -> {
+ if (fields.contains(entry.getKey())) {
+ return entry.getKey();
+ } else {
+ return "NULL";
+ }
+ }).collect(Collectors.joining(",")),
+ uuid));
+ command.add("--path");
+ command.add("\"" + clickhouseLocalFile + "\"");
+ LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+ ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
+ Process start = processBuilder.start();
+ // we just wait for the process to finish
+ try (InputStream inputStream = start.getInputStream();
+ InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+ BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ LOGGER.info(line);
+ }
+ }
+ start.waitFor();
+ File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName());
+ if (!file.exists()) {
+ throw new RuntimeException("clickhouse local file not exists");
+ }
+ File[] files = file.listFiles();
+ if (files == null) {
+ throw new RuntimeException("clickhouse local file not exists");
+ }
+ return Arrays.stream(files)
+ .filter(File::isDirectory)
+ .filter(f -> !"detached".equals(f.getName()))
+ .map(File::getAbsolutePath).collect(Collectors.toList());
+ }
+
+ private void attachClickhouseLocalFileToServer(Shard shard, List<String> clickhouseLocalFiles) {
+ if (ClickhouseFileCopyMethod.SCP.equals(clickhouseFileCopyMethod)) {
+ String hostAddress = shard.getHostAddress();
+ String password = nodePassword.getOrDefault(hostAddress, null);
+ FileTransfer fileTransfer = new ScpFileTransfer(hostAddress, password);
+ fileTransfer.init();
+ fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/");
+ fileTransfer.close();
+ } else {
+ throw new RuntimeException("unsupported clickhouse file copy method " + clickhouseFileCopyMethod);
+ }
+
+ try (ClickHouseConnectionImpl clickhouseConnection = clickhouseClient.getClickhouseConnection(shard)) {
+ for (String clickhouseLocalFile : clickhouseLocalFiles) {
+ clickhouseConnection.createStatement()
+ .execute(String.format("ALTER TABLE %s ATTACH PART '%s'",
+ clickhouseTable.getLocalTableName(),
+ clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1)));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Unable to close connection", e);
+ }
+ }
+
+ private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
+ String clickhouseLocalFile = clickhouseLocalFiles.get(0);
+ String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+ try {
+ File file = new File(localFileDir);
+ if (file.exists()) {
+ FileUtils.deleteDirectory(new File(localFileDir));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to delete directory " + localFileDir, e);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index c14b99c6..2e6d121d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -26,13 +26,17 @@ import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
+import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;
import ru.yandex.clickhouse.ClickHouseStatement;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -42,6 +46,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@SuppressWarnings("magicnumber")
@@ -49,6 +54,8 @@ public class ClickhouseClient {
private final BalancedClickhouseDataSource balancedClickhouseDataSource;
+ private Map<Shard, BalancedClickhouseDataSource> shardToDataSource = new ConcurrentHashMap<>(16);
+
public ClickhouseClient(Config config) {
Properties clickhouseProperties = new Properties();
if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
@@ -70,6 +77,18 @@ public class ClickhouseClient {
}
}
+ public ClickHouseConnectionImpl getClickhouseConnection(Shard shard) {
+ BalancedClickhouseDataSource shardDatasource = shardToDataSource.computeIfAbsent(shard, s -> {
+ ClickHouseProperties properties = this.balancedClickhouseDataSource.getProperties();
+ return new BalancedClickhouseDataSource(s.getJdbcUrl(), properties);
+ });
+ try {
+ return (ClickHouseConnectionImpl) shardDatasource.getConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException("Connot connect to target shard + " + shard.getJdbcUrl(), e);
+ }
+ }
+
public DistributedEngine getClickhouseDistributedTable(String database, String table) {
try (ClickHouseConnection connection = getClickhouseConnection()) {
return getClickhouseDistributedTable(connection, database, table);
@@ -95,6 +114,12 @@ public class ClickhouseClient {
}
}
+ /**
+ * Get ClickHouse table schema, the key is fileName, value is value type.
+ *
+ * @param table table name.
+ * @return schema map.
+ */
public Map<String, String> getClickhouseTableSchema(String table) {
try (ClickHouseConnection connection = getClickhouseConnection()) {
return getClickhouseTableSchema(connection, table);
@@ -117,6 +142,15 @@ public class ClickhouseClient {
return schema;
}
+ /**
+ * Get the shard of the given cluster.
+ *
+ * @param connection clickhouse connection.
+ * @param clusterName cluster name.
+ * @param database database of the shard.
+ * @param port port of the shard.
+ * @return shard list.
+ */
public List<Shard> getClusterShardList(ClickHouseConnection connection, String clusterName, String database, String port) {
String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
List<Shard> shardList = new ArrayList<>();
@@ -138,4 +172,44 @@ public class ClickhouseClient {
}
}
+ /**
+ * Get ClickHouse table info.
+ *
+ * @param database database of the table.
+ * @param table table name of the table.
+ * @return clickhouse table info.
+ */
+ public ClickhouseTable getClickhouseTable(String database, String table) {
+ try (ClickHouseConnection connection = balancedClickhouseDataSource.getConnection();
+ ClickHouseStatement statement = connection.createStatement()) {
+ String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table);
+ ResultSet resultSet = statement.executeQuery(sql);
+ if (!resultSet.next()) {
+ throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+ }
+
+ String engine = resultSet.getString(1);
+ String createTableDDL = resultSet.getString(2);
+ String engineFull = resultSet.getString(3);
+ List<String> dataPaths = JSON.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
+ });
+ DistributedEngine distributedEngine = null;
+ if ("Distributed".equals(engine)) {
+ distributedEngine = getClickhouseDistributedTable(connection, database, table);
+ }
+ return new ClickhouseTable(
+ database,
+ table,
+ distributedEngine,
+ engine,
+ createTableDDL,
+ engineFull,
+ dataPaths,
+ getClickhouseTableSchema(connection, table));
+ } catch (SQLException e) {
+ throw new RuntimeException("Cannot get clickhouse table", e);
+ }
+
+ }
+
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
index e6ad4438..cfbeb13c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ShardRouter.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
import ru.yandex.clickhouse.ClickHouseConnection;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
@@ -34,7 +35,9 @@ import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
-public class ShardRouter {
+public class ShardRouter implements Serializable {
+
+ private static final long serialVersionUID = -1L;
private String shardTable;
private final String table;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java
new file mode 100644
index 00000000..fcbccfd6
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ClickhouseTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.file;
+
+import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseTable {
+
+ private String database;
+ private String tableName;
+ private String engine;
+ private String engineFull;
+ private String createTableDDL;
+ private List<String> dataPaths;
+ private DistributedEngine distributedEngine;
+ private Map<String, String> tableSchema;
+
+ public ClickhouseTable(String database,
+ String tableName,
+ DistributedEngine distributedEngine,
+ String engine,
+ String createTableDDL,
+ String engineFull,
+ List<String> dataPaths,
+ Map<String, String> tableSchema) {
+ this.database = database;
+ this.tableName = tableName;
+ this.distributedEngine = distributedEngine;
+ this.engine = engine;
+ this.engineFull = engineFull;
+ this.createTableDDL = createTableDDL;
+ this.dataPaths = dataPaths;
+ this.tableSchema = tableSchema;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getEngine() {
+ return engine;
+ }
+
+ public void setEngine(String engine) {
+ this.engine = engine;
+ }
+
+ public String getEngineFull() {
+ return engineFull;
+ }
+
+ public void setEngineFull(String engineFull) {
+ this.engineFull = engineFull;
+ }
+
+ public String getCreateTableDDL() {
+ return createTableDDL;
+ }
+
+ public void setCreateTableDDL(String createTableDDL) {
+ this.createTableDDL = createTableDDL;
+ }
+
+ public List<String> getDataPaths() {
+ return dataPaths;
+ }
+
+ public void setDataPaths(List<String> dataPaths) {
+ this.dataPaths = dataPaths;
+ }
+
+ public Map<String, String> getTableSchema() {
+ return tableSchema;
+ }
+
+ public void setTableSchema(Map<String, String> tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ public String getLocalTableName() {
+ if (distributedEngine != null) {
+ return distributedEngine.getTable();
+ } else {
+ return tableName;
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
similarity index 73%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
index 68e2178a..e84d2393 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/pojo/IntHolder.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/FileTransfer.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.flink.clickhouse.pojo;
+package org.apache.seatunnel.flink.clickhouse.sink.file;
-public class IntHolder {
- private int value;
+import java.util.List;
- public int getValue() {
- return value;
- }
+public interface FileTransfer {
- public void setValue(int value) {
- this.value = value;
- }
+ void init();
+
+ void transferAndChown(String sourcePath, String targetPath);
+
+ void transferAndChown(List<String> sourcePath, String targetPath);
+
+ void close();
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java
new file mode 100644
index 00000000..0b1c58cc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/file/ScpFileTransfer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink.file;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.scp.client.ScpClient;
+import org.apache.sshd.scp.client.ScpClientCreator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScpFileTransfer implements FileTransfer {
+
+ private static final int SCP_PORT = 22;
+
+ private final String host;
+ private final String password;
+
+ private ScpClient scpClient;
+ private ClientSession clientSession;
+ private SshClient sshClient;
+
+ public ScpFileTransfer(String host, String password) {
+ this.host = host;
+ this.password = password;
+ }
+
+ @Override
+ public void init() {
+ try {
+ sshClient = SshClient.setUpDefaultClient();
+ sshClient.start();
+ clientSession = sshClient.connect("root", host, SCP_PORT).verify().getSession();
+ if (password != null) {
+ clientSession.addPasswordIdentity(password);
+ }
+ if (!clientSession.auth().verify().isSuccess()) {
+ throw new IOException("ssh host " + host + "authentication failed");
+ }
+ scpClient = ScpClientCreator.instance().createScpClient(clientSession);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to connect to host: " + host + " by user: root on port 22", e);
+ }
+ }
+
+ @Override
+ public void transferAndChown(String sourcePath, String targetPath) {
+ try {
+ scpClient.upload(
+ sourcePath,
+ targetPath,
+ ScpClient.Option.Recursive,
+ ScpClient.Option.TargetIsDirectory,
+ ScpClient.Option.PreserveAttributes);
+ } catch (IOException e) {
+ throw new RuntimeException("Scp failed to transfer file: " + sourcePath + " to: " + targetPath, e);
+ }
+ // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+ // make ATTACH command work.
+ List<String> command = new ArrayList<>();
+ command.add("ls");
+ command.add("-l");
+ command.add(targetPath.substring(0, targetPath.lastIndexOf("/")));
+ command.add("/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
+ try {
+ clientSession.executeRemoteCommand(String.join(" ", command));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to execute remote command: " + command, e);
+ }
+ }
+
+ @Override
+ public void transferAndChown(List<String> sourcePaths, String targetPath) {
+ if (sourcePaths == null) {
+ throw new IllegalArgumentException("sourcePath is null");
+ }
+ sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, targetPath));
+ }
+
+ @Override
+ public void close() {
+ if (clientSession != null && clientSession.isOpen()) {
+ try {
+ clientSession.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close ssh session", e);
+ }
+ }
+ if (sshClient != null && sshClient.isOpen()) {
+ sshClient.stop();
+ try {
+ sshClient.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close ssh client", e);
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
index 35104c33..acd3d6a5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
@@ -16,3 +16,4 @@
#
org.apache.seatunnel.flink.clickhouse.sink.ClickhouseBatchSink
+org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
index 9a349712..adbea780 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -45,7 +45,7 @@ public class FakeSourceToClickhouseIT extends FlinkContainer {
private GenericContainer<?> clickhouseServer;
private BalancedClickhouseDataSource dataSource;
- private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:21.3.20.1";
+ private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:22.1.3.7";
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToClickhouseIT.class);