You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/28 11:10:26 UTC
[incubator-seatunnel] branch api-draft updated: [API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 3057ba205 [API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)
3057ba205 is described below
commit 3057ba205f1fa4807eee6b2fcbdc675a1cdd50d5
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jun 28 19:10:19 2022 +0800
[API-Draft] [Connector] Add Clickhouse source and sink connector (#2051)
---
.../org/apache/seatunnel/api/sink/SinkWriter.java | 2 +-
.../seatunnel/api/table/type/SeaTunnelRowType.java | 9 +
seatunnel-connectors/plugin-mapping.properties | 3 +
.../flink/clickhouse/sink/ClickhouseBatchSink.java | 3 -
.../clickhouse/sink/ClickhouseOutputFormat.java | 2 +-
.../pom.xml | 23 ++
.../{Config.java => ClickhouseFileCopyMethod.java} | 37 ++--
.../seatunnel/clickhouse/config/Config.java | 71 +++++-
.../clickhouse/config/FileReaderOption.java | 113 ++++++++++
.../seatunnel/clickhouse/config/ReaderOption.java | 94 ++++++++
.../seatunnel/clickhouse/shard/Shard.java | 98 +++++++++
.../seatunnel/clickhouse/shard/ShardMetadata.java | 145 ++++++++++++
.../clickhouse/sink/DistributedEngine.java | 58 +++++
.../sink/client/ClickhouseBatchStatement.java | 52 +++++
.../clickhouse/sink/client/ClickhouseProxy.java | 212 ++++++++++++++++++
.../clickhouse/sink/client/ClickhouseSink.java | 172 +++++++++++++++
.../sink/client/ClickhouseSinkWriter.java | 230 +++++++++++++++++++
.../clickhouse/sink/client/ShardRouter.java | 97 ++++++++
.../clickhouse/sink/file/ClickhouseFileSink.java | 145 ++++++++++++
.../sink/file/ClickhouseFileSinkWriter.java | 243 +++++++++++++++++++++
.../clickhouse/sink/file/ClickhouseTable.java | 117 ++++++++++
.../Config.java => sink/file/FileTransfer.java} | 18 +-
.../clickhouse/sink/file/ScpFileTransfer.java | 124 +++++++++++
.../inject/ArrayInjectFunction.java} | 26 ++-
.../inject/BigDecimalInjectFunction.java} | 26 ++-
.../sink/inject/ClickhouseFieldInjectFunction.java | 46 ++++
.../inject/DateInjectFunction.java} | 37 ++--
.../inject/DateTimeInjectFunction.java} | 37 ++--
.../sink/inject/DoubleInjectFunction.java | 41 ++++
.../inject/FloatInjectFunction.java} | 37 ++--
.../clickhouse/sink/inject/IntInjectFunction.java | 47 ++++
.../inject/LongInjectFunction.java} | 27 +--
.../inject/StringInjectFunction.java} | 25 +--
.../clickhouse/source/ClickhouseSource.java | 18 +-
.../source/ClickhouseSourceSplitEnumerator.java | 1 +
.../Config.java => state/CKAggCommitInfo.java} | 18 +-
.../Config.java => state/CKCommitInfo.java} | 18 +-
.../Config.java => state/ClickhouseSinkState.java} | 18 +-
.../{config/Config.java => tool/IntHolder.java} | 22 +-
.../seatunnel/clickhouse/util/ClickhouseUtil.java | 40 ++++
seatunnel-dist/release-docs/LICENSE | 5 +-
seatunnel-dist/release-docs/NOTICE | 12 +
.../plugin/discovery/AbstractPluginDiscovery.java | 4 +-
.../spark/serialization/InternalRowConverter.java | 3 +
.../translation/spark/sink/SparkDataWriter.java | 1 +
tools/dependencies/known-dependencies.txt | 5 +
46 files changed, 2373 insertions(+), 209 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 56f97bac9..268d3d40e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -74,7 +74,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
*/
void close() throws IOException;
- interface Context {
+ interface Context extends Serializable{
/**
* Gets the configuration with which Job was started.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
index 848cdf43c..45164c246 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
@@ -76,6 +76,15 @@ public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
return fieldTypes[index];
}
+ public int indexOf(String fieldName) {
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(fieldName)) {
+ return i;
+ }
+ }
+ throw new IllegalArgumentException(String.format("can't find field %s", fieldName));
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index d06354594..b14f2f035 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -92,5 +92,8 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.source.Http = seatunnel-connector-seatunnel-http
seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
+seatunnel.source.Clickhouse = seatunnel-connector-seatunnel-clickhouse
+seatunnel.sink.Clickhouse = seatunnel-connector-seatunnel-clickhouse
+seatunnel.sink.ClickhouseFile = seatunnel-connector-seatunnel-clickhouse
seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 620e921e1..36ab1a3d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -46,8 +46,6 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
import ru.yandex.clickhouse.ClickHouseConnection;
-import javax.annotation.Nullable;
-
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -73,7 +71,6 @@ public class ClickhouseBatchSink implements FlinkBatchSink {
return config;
}
- @Nullable
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
ClickhouseOutputFormat clickhouseOutputFormat = new ClickhouseOutputFormat(config, shardMetadata, fields, tableSchema);
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
index cb392b9a0..efd4eb7d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java
@@ -224,7 +224,7 @@ public class ClickhouseOutputFormat extends RichOutputFormat<Row> {
break;
}
}
- result.put(field, function);
+ result.put(fieldType, function);
}
return result;
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
index e3fb148aa..38c2f6db4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml
@@ -36,12 +36,35 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-scp</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<!-- TODO add to dependency management after version unify -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.3.2-patch9</version>
</dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.11.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ <version>0.3.2-patch9</version>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
index 65b7af7c6..cec1f48bb 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
@@ -17,19 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
-
+public enum ClickhouseFileCopyMethod {
+ SCP("scp"),
+ RSYNC("rsync"),
+ ;
+ private final String name;
+
+ ClickhouseFileCopyMethod(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static ClickhouseFileCopyMethod from(String name) {
+ for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) {
+ if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) {
+ return clickhouseFileCopyMethod;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
index 65b7af7c6..6563274ba 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
@@ -17,19 +17,80 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-/**
- * The config of clickhouse
- */
public class Config {
- public static final String NODE_ADDRESS = "node_address";
+ /**
+ * Bulk size of clickhouse jdbc
+ */
+ public static final String BULK_SIZE = "bulk_size";
- public static final String DATABASE = "database";
+ /**
+ * Clickhouse fields
+ */
+ public static final String FIELDS = "fields";
public static final String SQL = "sql";
+ /**
+ * Clickhouse server host
+ */
+ public static final String HOST = "host";
+
+ /**
+ * Clickhouse table name
+ */
+ public static final String TABLE = "table";
+
+ /**
+ * Clickhouse database name
+ */
+ public static final String DATABASE = "database";
+
+ /**
+ * Clickhouse server username
+ */
public static final String USERNAME = "username";
+ /**
+ * Clickhouse server password
+ */
public static final String PASSWORD = "password";
+ /**
+ * Split mode when table is distributed engine
+ */
+ public static final String SPLIT_MODE = "split_mode";
+
+ /**
+ * When split_mode is true, the sharding_key use for split
+ */
+ public static final String SHARDING_KEY = "sharding_key";
+
+ /**
+ * ClickhouseFile sink connector used clickhouse-local program's path
+ */
+ public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";
+
+ /**
+ * The method of copy Clickhouse file
+ */
+ public static final String COPY_METHOD = "copy_method";
+
+ /**
+ * The size of each batch read temporary data into local file.
+ */
+ public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";
+
+ /**
+ * The password of Clickhouse server node
+ */
+ public static final String NODE_PASS = "node_pass";
+
+ /**
+ * The address of Clickhouse server node
+ */
+ public static final String NODE_ADDRESS = "node_address";
+
+ public static final String CLICKHOUSE_PREFIX = "clickhouse.";
+
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
new file mode 100644
index 000000000..720810789
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class FileReaderOption implements Serializable {
+
+ private ShardMetadata shardMetadata;
+ private Map<String, String> tableSchema;
+ private List<String> fields;
+ private String clickhouseLocalPath;
+ private ClickhouseFileCopyMethod copyMethod;
+ private boolean nodeFreePass;
+ private Map<String, String> nodePassword;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ public FileReaderOption(ShardMetadata shardMetadata, Map<String, String> tableSchema,
+ List<String> fields, String clickhouseLocalPath,
+ ClickhouseFileCopyMethod copyMethod,
+ Map<String, String> nodePassword) {
+ this.shardMetadata = shardMetadata;
+ this.tableSchema = tableSchema;
+ this.fields = fields;
+ this.clickhouseLocalPath = clickhouseLocalPath;
+ this.copyMethod = copyMethod;
+ this.nodePassword = nodePassword;
+ }
+
+ public SeaTunnelRowType getSeaTunnelRowType() {
+ return seaTunnelRowType;
+ }
+
+ public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ public boolean isNodeFreePass() {
+ return nodeFreePass;
+ }
+
+ public void setNodeFreePass(boolean nodeFreePass) {
+ this.nodeFreePass = nodeFreePass;
+ }
+
+ public String getClickhouseLocalPath() {
+ return clickhouseLocalPath;
+ }
+
+ public void setClickhouseLocalPath(String clickhouseLocalPath) {
+ this.clickhouseLocalPath = clickhouseLocalPath;
+ }
+
+ public ClickhouseFileCopyMethod getCopyMethod() {
+ return copyMethod;
+ }
+
+ public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) {
+ this.copyMethod = copyMethod;
+ }
+
+ public Map<String, String> getNodePassword() {
+ return nodePassword;
+ }
+
+ public void setNodePassword(Map<String, String> nodePassword) {
+ this.nodePassword = nodePassword;
+ }
+
+ public ShardMetadata getShardMetadata() {
+ return shardMetadata;
+ }
+
+ public void setShardMetadata(ShardMetadata shardMetadata) {
+ this.shardMetadata = shardMetadata;
+ }
+
+ public Map<String, String> getTableSchema() {
+ return tableSchema;
+ }
+
+ public void setTableSchema(Map<String, String> tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(List<String> fields) {
+ this.fields = fields;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
new file mode 100644
index 000000000..084f54bcc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ReaderOption implements Serializable {
+
+ private ShardMetadata shardMetadata;
+ private List<String> fields;
+
+ private Map<String, String> tableSchema;
+ private SeaTunnelRowType seaTunnelRowType;
+ private Properties properties;
+ private int bulkSize;
+
+ public ReaderOption(ShardMetadata shardMetadata,
+ Properties properties, List<String> fields, Map<String, String> tableSchema, int bulkSize) {
+ this.shardMetadata = shardMetadata;
+ this.properties = properties;
+ this.fields = fields;
+ this.tableSchema = tableSchema;
+ this.bulkSize = bulkSize;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public ShardMetadata getShardMetadata() {
+ return shardMetadata;
+ }
+
+ public void setShardMetadata(ShardMetadata shardMetadata) {
+ this.shardMetadata = shardMetadata;
+ }
+
+ public SeaTunnelRowType getSeaTunnelRowType() {
+ return seaTunnelRowType;
+ }
+
+ public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ public Map<String, String> getTableSchema() {
+ return tableSchema;
+ }
+
+ public void setTableSchema(Map<String, String> tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(List<String> fields) {
+ this.fields = fields;
+ }
+
+ public int getBulkSize() {
+ return bulkSize;
+ }
+
+ public void setBulkSize(int bulkSize) {
+ this.bulkSize = bulkSize;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
new file mode 100644
index 000000000..2fe726325
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
+
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseNode;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+public class Shard implements Serializable {
+ private static final long serialVersionUID = -1L;
+
+ private final int shardNum;
+ private final int replicaNum;
+
+ private final ClickHouseNode node;
+
+ // cache the hash code
+ private int hashCode = -1;
+
+ public Shard(int shardNum,
+ int shardWeight,
+ int replicaNum,
+ String hostname,
+ String hostAddress,
+ int port,
+ String database,
+ String username,
+ String password) {
+ this.shardNum = shardNum;
+ this.replicaNum = replicaNum;
+ this.node = ClickHouseNode.builder().host(hostname).address(InetSocketAddress.createUnresolved(hostAddress,
+ port)).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build();
+ }
+
+ public Shard(int shardNum, int replicaNum, ClickHouseNode node) {
+ this.shardNum = shardNum;
+ this.replicaNum = replicaNum;
+ this.node = node;
+ }
+
+ public int getShardNum() {
+ return shardNum;
+ }
+
+ public int getReplicaNum() {
+ return replicaNum;
+ }
+
+ public ClickHouseNode getNode() {
+ return node;
+ }
+
+ public String getJdbcUrl() {
+ return "jdbc:clickhouse://" + node.getAddress().getHostName()
+ + ":" + node.getAddress().getPort() + "/" + node.getDatabase().get();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Shard shard = (Shard) o;
+ return shardNum == shard.shardNum
+ && replicaNum == shard.replicaNum
+ && hashCode == shard.hashCode
+ && Objects.equals(node, shard.node);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == -1) {
+ hashCode = Objects.hash(shardNum, replicaNum, node, hashCode);
+ }
+ return hashCode;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
new file mode 100644
index 000000000..3c01922f1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class ShardMetadata implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private String shardKey;
+ private String shardKeyType;
+ private String database;
+ private String table;
+ private boolean splitMode;
+ private Shard defaultShard;
+ private String username;
+ private String password;
+
+ public ShardMetadata(String shardKey,
+ String shardKeyType,
+ String database,
+ String table,
+ boolean splitMode,
+ Shard defaultShard,
+ String username,
+ String password) {
+ this.shardKey = shardKey;
+ this.shardKeyType = shardKeyType;
+ this.database = database;
+ this.table = table;
+ this.splitMode = splitMode;
+ this.defaultShard = defaultShard;
+ this.username = username;
+ this.password = password;
+ }
+
+ public String getShardKey() {
+ return shardKey;
+ }
+
+ public void setShardKey(String shardKey) {
+ this.shardKey = shardKey;
+ }
+
+ public String getShardKeyType() {
+ return shardKeyType;
+ }
+
+ public void setShardKeyType(String shardKeyType) {
+ this.shardKeyType = shardKeyType;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public boolean getSplitMode() {
+ return splitMode;
+ }
+
+ public void setSplitMode(boolean splitMode) {
+ this.splitMode = splitMode;
+ }
+
+ public Shard getDefaultShard() {
+ return defaultShard;
+ }
+
+ public void setDefaultShard(Shard defaultShard) {
+ this.defaultShard = defaultShard;
+ }
+
+ public boolean isSplitMode() {
+ return splitMode;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ShardMetadata that = (ShardMetadata) o;
+ return splitMode == that.splitMode
+ && Objects.equals(shardKey, that.shardKey)
+ && Objects.equals(shardKeyType, that.shardKeyType)
+ && Objects.equals(database, that.database)
+ && Objects.equals(table, that.table)
+ && Objects.equals(defaultShard, that.defaultShard)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(shardKey, shardKeyType, database, table, splitMode, defaultShard, username, password);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
new file mode 100644
index 000000000..6a15d5919
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+
+import java.io.Serializable;
+
+public class DistributedEngine implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+ private String clusterName;
+ private String database;
+ private String table;
+
+ public DistributedEngine(String clusterName, String database, String table) {
+ this.clusterName = clusterName;
+ this.database = database;
+ this.table = table;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
new file mode 100644
index 000000000..ae525acee
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+
+import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
+
+import java.sql.PreparedStatement;
+
+public class ClickhouseBatchStatement {
+
+ private final ClickHouseConnectionImpl clickHouseConnection;
+ private final PreparedStatement preparedStatement;
+ private final IntHolder intHolder;
+
+ public ClickhouseBatchStatement(ClickHouseConnectionImpl clickHouseConnection,
+ PreparedStatement preparedStatement,
+ IntHolder intHolder) {
+ this.clickHouseConnection = clickHouseConnection;
+ this.preparedStatement = preparedStatement;
+ this.intHolder = intHolder;
+ }
+
+ public ClickHouseConnectionImpl getClickHouseConnection() {
+ return clickHouseConnection;
+ }
+
+ public PreparedStatement getPreparedStatement() {
+ return preparedStatement;
+ }
+
+ public IntHolder getIntHolder() {
+ return intHolder;
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
new file mode 100644
index 000000000..3a3aa082c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseFormat;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseRecord;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("magicnumber")
+public class ClickhouseProxy {
+
+ private final ClickHouseRequest<?> clickhouseRequest;
+ private final ClickHouseClient client;
+
+ private final Map<Shard, ClickHouseClient> shardToDataSource = new ConcurrentHashMap<>(16);
+
+ public ClickhouseProxy(ClickHouseNode node) {
+ this.client = ClickHouseClient.newInstance(node.getProtocol());
+ this.clickhouseRequest =
+ client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+
+ }
+
+ public ClickHouseRequest<?> getClickhouseConnection() {
+ return this.clickhouseRequest;
+ }
+
+ public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
+ ClickHouseClient c = shardToDataSource
+ .computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol()));
+ return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+ }
+
+ public DistributedEngine getClickhouseDistributedTable(String database, String table) {
+ ClickHouseRequest<?> request = getClickhouseConnection();
+ return getClickhouseDistributedTable(request, database, table);
+ }
+
+ public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database,
+ String table) {
+ String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
+ try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
+ List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
+ if (!records.isEmpty()) {
+ ClickHouseRecord record = records.get(0);
+ // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
+ String engineFull = record.getValue(0).asString();
+ List<String> infos = Arrays.stream(engineFull.substring(12).split(","))
+ .map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
+ return new DistributedEngine(infos.get(0), infos.get(1), infos.get(2).replace("\\)", "").trim());
+ }
+ throw new RuntimeException("Cannot get distributed table from clickhouse, resultSet is empty");
+ } catch (ClickHouseException e) {
+ throw new RuntimeException("Cannot get distributed table from clickhouse", e);
+ }
+ }
+
+ /**
+ * Get ClickHouse table schema, the key is fileName, value is value type.
+ *
+ * @param table table name.
+ * @return schema map.
+ */
+ public Map<String, String> getClickhouseTableSchema(String table) {
+ ClickHouseRequest<?> request = getClickhouseConnection();
+ return getClickhouseTableSchema(request, table);
+ }
+
+ public Map<String, String> getClickhouseTableSchema(ClickHouseRequest<?> request, String table) {
+ String sql = "desc " + table;
+ Map<String, String> schema = new LinkedHashMap<>();
+ try (ClickHouseResponse response = request.query(sql).executeAndWait()) {
+ response.records().forEach(r -> schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
+ } catch (ClickHouseException e) {
+ throw new RuntimeException("Cannot get table schema from clickhouse", e);
+ }
+ return schema;
+ }
+
+ /**
+ * Get the shard of the given cluster.
+ *
+ * @param connection clickhouse connection.
+ * @param clusterName cluster name.
+ * @param database database of the shard.
+ * @param port port of the shard.
+ * @return shard list.
+ */
+ public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, String clusterName,
+ String database, int port, String username, String password) {
+ String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
+ List<Shard> shardList = new ArrayList<>();
+ try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
+ response.records().forEach(r -> {
+ shardList.add(new Shard(
+ r.getValue(0).asInteger(),
+ r.getValue(1).asInteger(),
+ r.getValue(2).asInteger(),
+ r.getValue(3).asString(),
+ r.getValue(4).asString(),
+ port, database, username, password));
+ });
+ return shardList;
+ } catch (ClickHouseException e) {
+ throw new RuntimeException("Cannot get cluster shard list from clickhouse", e);
+ }
+ }
+
+ /**
+ * Get ClickHouse table info.
+ *
+ * @param database database of the table.
+ * @param table table name of the table.
+ * @return clickhouse table info.
+ */
+ public ClickhouseTable getClickhouseTable(String database, String table) {
+ String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table);
+ try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
+ List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
+ if (records.isEmpty()) {
+ throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+ }
+ ClickHouseRecord record = records.get(0);
+ String engine = record.getValue(0).asString();
+ String createTableDDL = record.getValue(1).asString();
+ String engineFull = record.getValue(2).asString();
+ List<String> dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList());
+ DistributedEngine distributedEngine = null;
+ if ("Distributed".equals(engine)) {
+ distributedEngine = getClickhouseDistributedTable(clickhouseRequest, database, table);
+ String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'",
+ distributedEngine.getDatabase(), distributedEngine.getTable());
+ try (ClickHouseResponse rs = clickhouseRequest.query(localTableSQL).executeAndWait()) {
+ List<ClickHouseRecord> localTableRecords = rs.stream().collect(Collectors.toList());
+ if (localTableRecords.isEmpty()) {
+ throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+ }
+ String localEngine = localTableRecords.get(0).getValue(0).asString();
+ String createLocalTableDDL = localTableRecords.get(0).getValue(1).asString();
+ createTableDDL = localizationEngine(localEngine, createLocalTableDDL);
+ }
+ }
+ return new ClickhouseTable(
+ database,
+ table,
+ distributedEngine,
+ engine,
+ createTableDDL,
+ engineFull,
+ dataPaths,
+ getClickhouseTableSchema(clickhouseRequest, table));
+ } catch (ClickHouseException e) {
+ throw new RuntimeException("Cannot get clickhouse table", e);
+ }
+
+ }
+
+ /**
+ * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+ * For example: change ReplicatedMergeTree to MergeTree.
+ *
+ * @param engine original engine of clickhouse local table
+ * @param ddl createTableDDL of clickhouse local table
+ * @return createTableDDL of clickhouse local table which can support specific engine
+ * TODO: support more engine
+ */
+ public String localizationEngine(String engine, String ddl) {
+ if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+ return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
+ } else {
+ return ddl;
+ }
+ }
+
+ public void close() {
+ if (this.client != null) {
+ this.client.close();
+ }
+ shardToDataSource.values().forEach(ClickHouseClient::close);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
new file mode 100644
index 000000000..295547c74
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.BULK_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_PREFIX;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SPLIT_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+@AutoService(SeaTunnelSink.class)
+public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+
+ private SeaTunnelContext seaTunnelContext;
+ private ReaderOption option;
+
+ @Override
+ public String getPluginName() {
+ return "Clickhouse";
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, TABLE, USERNAME, PASSWORD);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+ }
+ Map<String, Object> defaultConfig = ImmutableMap.<String, Object>builder()
+ .put(BULK_SIZE, 20_000)
+ .put(SPLIT_MODE, false)
+ .build();
+
+ config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
+
+ List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+ config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD));
+
+ Properties clickhouseProperties = new Properties();
+ if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
+ TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX, false).entrySet().forEach(e -> {
+ clickhouseProperties.put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
+ });
+ }
+ clickhouseProperties.put("user", config.getString(USERNAME));
+ clickhouseProperties.put("password", config.getString(PASSWORD));
+
+ ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+ Map<String, String> tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE));
+ String shardKey = null;
+ String shardKeyType = null;
+ if (config.getBoolean(SPLIT_MODE)) {
+ ClickhouseTable table = proxy.getClickhouseTable(config.getString(DATABASE),
+ config.getString(TABLE));
+ if (!"Distributed".equals(table.getEngine())) {
+ throw new IllegalArgumentException("split mode only support table which engine is " +
+ "'Distributed' engine at now");
+ }
+ if (config.hasPath(SHARDING_KEY)) {
+ shardKey = config.getString(SHARDING_KEY);
+ shardKeyType = tableSchema.get(shardKey);
+ }
+ }
+ ShardMetadata metadata = new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE),
+ config.getString(TABLE),
+ config.getBoolean(SPLIT_MODE),
+ new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD));
+ List<String> fields = new ArrayList<>();
+ if (config.hasPath(FIELDS)) {
+ fields.addAll(config.getStringList(FIELDS));
+ // check if the fields exist in schema
+ for (String field : fields) {
+ if (!tableSchema.containsKey(field)) {
+ throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE));
+ }
+ }
+ } else {
+ fields.addAll(tableSchema.keySet());
+ }
+ proxy.close();
+ this.option = new ReaderOption(metadata, clickhouseProperties, fields, tableSchema, config.getInt(BULK_SIZE));
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
+ return new ClickhouseSinkWriter(option, context);
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter(SinkWriter.Context context, List<ClickhouseSinkState> states) throws IOException {
+ return SeaTunnelSink.super.restoreWriter(context, states);
+ }
+
+ @Override
+ public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.option.setSeaTunnelRowType(seaTunnelRowType);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.option.getSeaTunnelRowType();
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
new file mode 100644
index 000000000..604f2c609
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
+
+import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class);
+
+ private final SinkWriter.Context context;
+ private final ReaderOption option;
+ private final ShardRouter shardRouter;
+ private final transient ClickhouseProxy proxy;
+ private final String prepareSql;
+ private final Map<Shard, ClickhouseBatchStatement> statementMap;
+ private final Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap;
+ private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction();
+
+ private static final Pattern NULLABLE = Pattern.compile("Nullable\\((.*)\\)");
+ private static final Pattern LOW_CARDINALITY = Pattern.compile("LowCardinality\\((.*)\\)");
+
+ ClickhouseSinkWriter(ReaderOption option, SinkWriter.Context context) {
+ this.option = option;
+ this.context = context;
+
+ this.proxy = new ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode());
+ this.fieldInjectFunctionMap = initFieldInjectFunctionMap();
+ this.shardRouter = new ShardRouter(proxy, option.getShardMetadata());
+ this.prepareSql = initPrepareSQL();
+ this.statementMap = initStatementMap();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+
+ Object shardKey = null;
+ if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) {
+ int i = this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey());
+ shardKey = element.getField(i);
+ }
+ ClickhouseBatchStatement statement = statementMap.get(shardRouter.getShard(shardKey));
+ PreparedStatement clickHouseStatement = statement.getPreparedStatement();
+ IntHolder sizeHolder = statement.getIntHolder();
+ // add into batch
+ addIntoBatch(element, clickHouseStatement);
+ sizeHolder.setValue(sizeHolder.getValue() + 1);
+ // flush batch
+ if (sizeHolder.getValue() >= option.getBulkSize()) {
+ flush(clickHouseStatement);
+ sizeHolder.setValue(0);
+ }
+ }
+
+ @Override
+ public Optional<CKCommitInfo> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.proxy.close();
+ for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+ try (ClickHouseConnectionImpl needClosedConnection = batchStatement.getClickHouseConnection();
+ PreparedStatement needClosedStatement = batchStatement.getPreparedStatement()) {
+ IntHolder intHolder = batchStatement.getIntHolder();
+ if (intHolder.getValue() > 0) {
+ flush(needClosedStatement);
+ intHolder.setValue(0);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to close prepared statement.", e);
+ }
+ }
+ }
+
+ private void addIntoBatch(SeaTunnelRow row, PreparedStatement clickHouseStatement) {
+ try {
+ for (int i = 0; i < option.getFields().size(); i++) {
+ String fieldName = option.getFields().get(i);
+ Object fieldValue = row.getField(option.getSeaTunnelRowType().indexOf(fieldName));
+ if (fieldValue == null) {
+ // field does not exist in row
+ // todo: do we need to transform to default value of each type
+ clickHouseStatement.setObject(i + 1, null);
+ continue;
+ }
+ String fieldType = option.getTableSchema().get(fieldName);
+ fieldInjectFunctionMap
+ .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
+ .injectFields(clickHouseStatement, i + 1, fieldValue);
+ }
+ clickHouseStatement.addBatch();
+ } catch (SQLException e) {
+ throw new RuntimeException("Add row data into batch error", e);
+ }
+ }
+
+ private void flush(PreparedStatement clickHouseStatement) {
+ try {
+ clickHouseStatement.executeBatch();
+ } catch (Exception e) {
+ throw new RuntimeException("Clickhouse execute batch statement error", e);
+ }
+ }
+
+ private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
+ Map<Shard, ClickhouseBatchStatement> result = new HashMap<>(Common.COLLECTION_SIZE);
+ shardRouter.getShards().forEach((weight, s) -> {
+ try {
+ ClickHouseConnectionImpl clickhouseConnection = new ClickHouseConnectionImpl(s.getJdbcUrl(),
+ this.option.getProperties());
+ PreparedStatement preparedStatement = clickhouseConnection.prepareStatement(prepareSql);
+ IntHolder intHolder = new IntHolder();
+ ClickhouseBatchStatement batchStatement =
+ new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder);
+ result.put(s, batchStatement);
+ } catch (SQLException e) {
+ throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e);
+ }
+ });
+ return result;
+ }
+
+ private String initPrepareSQL() {
+ String[] placeholder = new String[option.getFields().size()];
+ Arrays.fill(placeholder, "?");
+
+ return String.format("INSERT INTO %s (%s) VALUES (%s)",
+ shardRouter.getShardTable(),
+ String.join(",", option.getFields()),
+ String.join(",", placeholder));
+ }
+
+ private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
+ Map<String, ClickhouseFieldInjectFunction> result = new HashMap<>(Common.COLLECTION_SIZE);
+ List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions = Lists.newArrayList(
+ new ArrayInjectFunction(),
+ new BigDecimalInjectFunction(),
+ new DateInjectFunction(),
+ new DateTimeInjectFunction(),
+ new DoubleInjectFunction(),
+ new FloatInjectFunction(),
+ new IntInjectFunction(),
+ new LongInjectFunction(),
+ new StringInjectFunction()
+ );
+ ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction();
+ // get field type
+ for (String field : this.option.getFields()) {
+ ClickhouseFieldInjectFunction function = defaultFunction;
+ String fieldType = this.option.getTableSchema().get(field);
+ for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) {
+ if (clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(fieldType))) {
+ function = clickhouseFieldInjectFunction;
+ break;
+ }
+ }
+ result.put(fieldType, function);
+ }
+ return result;
+ }
+
+ private String unwrapCommonPrefix(String fieldType) {
+ Matcher nullMatcher = NULLABLE.matcher(fieldType);
+ Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
+ if (nullMatcher.matches()) {
+ return nullMatcher.group(1);
+ } else if (lowMatcher.matches()) {
+ return lowMatcher.group(1);
+ } else {
+ return fieldType;
+ }
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
new file mode 100644
index 000000000..4471f8157
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+
+import com.clickhouse.client.ClickHouseRequest;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShardRouter implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private String shardTable;
+ private final String table;
+ private int shardWeightCount;
+ private final TreeMap<Integer, Shard> shards;
+ private final String shardKey;
+ private final String shardKeyType;
+ private final boolean splitMode;
+
+ private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+ private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
+
+ public ShardRouter(ClickhouseProxy proxy, ShardMetadata shardMetadata) {
+ this.shards = new TreeMap<>();
+ this.shardKey = shardMetadata.getShardKey();
+ this.shardKeyType = shardMetadata.getShardKeyType();
+ this.splitMode = shardMetadata.getSplitMode();
+ this.table = shardMetadata.getTable();
+ if (StringUtils.isNotEmpty(shardKey) && StringUtils.isEmpty(shardKeyType)) {
+ throw new IllegalArgumentException("Shard key " + shardKey + " not found in table " + table);
+ }
+ ClickHouseRequest<?> connection = proxy.getClickhouseConnection();
+ if (splitMode) {
+ DistributedEngine localTable = proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), table);
+ this.shardTable = localTable.getTable();
+ List<Shard> shardList = proxy.getClusterShardList(connection, localTable.getClusterName(),
+ localTable.getDatabase(), shardMetadata.getDefaultShard().getNode().getPort(),
+ shardMetadata.getUsername(), shardMetadata.getPassword());
+ int weight = 0;
+ for (Shard shard : shardList) {
+ shards.put(weight, shard);
+ weight += shard.getNode().getWeight();
+ }
+ shardWeightCount = weight;
+ } else {
+ shards.put(0, shardMetadata.getDefaultShard());
+ }
+ }
+
+ public String getShardTable() {
+ return splitMode ? shardTable : table;
+ }
+
+ public Shard getShard(Object shardValue) {
+ if (!splitMode) {
+ return shards.firstEntry().getValue();
+ }
+ if (StringUtils.isEmpty(shardKey) || shardValue == null) {
+ return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
+ }
+ int offset = (int) (HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
+ 0) & Long.MAX_VALUE % shardWeightCount);
+ return shards.lowerEntry(offset + 1).getValue();
+ }
+
+ public TreeMap<Integer, Shard> getShards() {
+ return shards;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
new file mode 100644
index 000000000..05c511292
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_LOCAL_PATH;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.COPY_METHOD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_PASS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.clickhouse.client.ClickHouseNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSink.class)
+public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+
+ private SeaTunnelContext seaTunnelContext;
+ private FileReaderOption readerOption;
+
+ @Override
+ public String getPluginName() {
+ return "ClickhouseFile";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+ if (!checkResult.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg());
+ }
+ Map<String, Object> defaultConfigs = ImmutableMap.<String, Object>builder()
+ .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+ .build();
+
+ config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
+ List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+ config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD));
+
+ ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+ Map<String, String> tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE));
+ String shardKey = null;
+ String shardKeyType = null;
+ if (config.hasPath(SHARDING_KEY)) {
+ shardKey = config.getString(SHARDING_KEY);
+ shardKeyType = tableSchema.get(shardKey);
+ }
+ ShardMetadata shardMetadata = new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE),
+ config.getString(TABLE),
+ false, // we don't need to set splitMode in clickhouse file mode.
+ new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD));
+ List<String> fields;
+ if (config.hasPath(FIELDS)) {
+ fields = config.getStringList(FIELDS);
+ // check if the fields exist in schema
+ for (String field : fields) {
+ if (!tableSchema.containsKey(field)) {
+ throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE));
+ }
+ }
+ } else {
+ fields = new ArrayList<>(tableSchema.keySet());
+ }
+ Map<String, String> nodePassword = config.getObjectList(NODE_PASS).stream()
+ .collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
+ configObject -> configObject.toConfig().getString(PASSWORD)));
+
+ proxy.close();
+ this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH),
+ ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD)), nodePassword);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.readerOption.setSeaTunnelRowType(seaTunnelRowType);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.readerOption.getSeaTunnelRowType();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
+ return new ClickhouseFileSinkWriter(readerOption, context);
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
new file mode 100644
index 000000000..c6b6bfa1f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
+ private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
+ private static final int UUID_LENGTH = 10;
+ private final FileReaderOption readerOption;
+ private final ShardRouter shardRouter;
+ private final ClickhouseProxy proxy;
+ private final ClickhouseTable clickhouseTable;
+ private final Map<Shard, List<String>> shardLocalDataPaths;
+ private final Map<Shard, List<SeaTunnelRow>> rowCache;
+
+ public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) {
+ this.readerOption = readerOption;
+ proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
+ shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata());
+ clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
+ this.readerOption.getShardMetadata().getTable());
+ rowCache = new HashMap<>(Common.COLLECTION_SIZE);
+
+ nodePasswordCheck();
+
+ // find file local save path of each node
+ shardLocalDataPaths = shardRouter.getShards().values().stream()
+ .collect(Collectors.toMap(Function.identity(), shard -> {
+ ClickhouseTable shardTable = proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
+ clickhouseTable.getLocalTableName());
+ return shardTable.getDataPaths();
+ }));
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Shard shard = shardRouter.getShard(element);
+ rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element);
+ }
+
+ private void nodePasswordCheck() {
+ if (!this.readerOption.isNodeFreePass()) {
+ shardRouter.getShards().values().forEach(shard -> {
+ if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName())
+ && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
+ throw new RuntimeException("Cannot find password of shard " + shard.getNode().getAddress().getHostName());
+ }
+ });
+ }
+ }
+
+ @Override
+ public Optional<CKCommitInfo> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ rowCache.forEach(this::flush);
+ }
+
+ private void flush(Shard shard, List<SeaTunnelRow> rows) {
+ try {
+ // generate clickhouse local file
+ // TODO generate file by sub rows to save memory
+ List<String> clickhouseLocalFiles = generateClickhouseLocalFiles(rows);
+ // move file to server
+ attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new RuntimeException("Flush data into clickhouse file error", e);
+ }
+ }
+
+ private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) throws IOException,
+ InterruptedException {
+ if (rows.isEmpty()) {
+ return Collections.emptyList();
+ }
+ String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_");
+ String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
+ FileUtils.forceMkdir(new File(clickhouseLocalFile));
+ String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log";
+ try (FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE,
+ StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
+ String data = rows.stream()
+ .map(row -> this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+ .collect(Collectors.joining("\t")))
+ .collect(Collectors.joining("\n"));
+ MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+ data.getBytes(StandardCharsets.UTF_8).length);
+ buffer.put(data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ List<String> localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
+ .collect(Collectors.toList());
+ List<String> command = new ArrayList<>(localPaths);
+ if (localPaths.size() == 1) {
+ command.add("local");
+ }
+ command.add("--file");
+ command.add(clickhouseLocalFileTmpFile);
+ command.add("-S");
+ command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\"");
+ command.add("-N");
+ command.add("\"" + "temp_table" + uuid + "\"");
+ command.add("-q");
+ command.add(String.format(
+ "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+ clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""),
+ clickhouseTable.getLocalTableName(),
+ readerOption.getTableSchema().keySet().stream().map(s -> {
+ if (readerOption.getFields().contains(s)) {
+ return s;
+ } else {
+ return "NULL";
+ }
+ }).collect(Collectors.joining(",")),
+ uuid));
+ command.add("--path");
+ command.add("\"" + clickhouseLocalFile + "\"");
+ LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+ ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
+ Process start = processBuilder.start();
+ // we just wait for the process to finish
+ try (InputStream inputStream = start.getInputStream();
+ InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+ BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ LOGGER.info(line);
+ }
+ }
+ start.waitFor();
+ File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName());
+ if (!file.exists()) {
+ throw new RuntimeException("clickhouse local file not exists");
+ }
+ File[] files = file.listFiles();
+ if (files == null) {
+ throw new RuntimeException("clickhouse local file not exists");
+ }
+ return Arrays.stream(files)
+ .filter(File::isDirectory)
+ .filter(f -> !"detached".equals(f.getName()))
+ .map(File::getAbsolutePath).collect(Collectors.toList());
+ }
+
+ private void attachClickhouseLocalFileToServer(Shard shard, List<String> clickhouseLocalFiles) throws ClickHouseException {
+ if (ClickhouseFileCopyMethod.SCP.equals(this.readerOption.getCopyMethod())) {
+ String hostAddress = shard.getNode().getAddress().getHostName();
+ String password = readerOption.getNodePassword().getOrDefault(hostAddress, null);
+ FileTransfer fileTransfer = new ScpFileTransfer(hostAddress, password);
+ fileTransfer.init();
+ fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/");
+ fileTransfer.close();
+ } else {
+ throw new RuntimeException("unsupported clickhouse file copy method " + readerOption.getCopyMethod());
+ }
+
+ ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+ for (String clickhouseLocalFile : clickhouseLocalFiles) {
+ ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'",
+ clickhouseTable.getLocalTableName(),
+ clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait();
+ response.close();
+ }
+ }
+
+ private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
+ String clickhouseLocalFile = clickhouseLocalFiles.get(0);
+ String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+ try {
+ File file = new File(localFileDir);
+ if (file.exists()) {
+ FileUtils.deleteDirectory(new File(localFileDir));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to delete directory " + localFileDir, e);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
new file mode 100644
index 000000000..a6c8e0fe0
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseTable {
+
+ private String database;
+ private String tableName;
+ private String engine;
+ private String engineFull;
+ private String createTableDDL;
+ private List<String> dataPaths;
+ private final DistributedEngine distributedEngine;
+ private Map<String, String> tableSchema;
+
+ public ClickhouseTable(String database,
+ String tableName,
+ DistributedEngine distributedEngine,
+ String engine,
+ String createTableDDL,
+ String engineFull,
+ List<String> dataPaths,
+ Map<String, String> tableSchema) {
+ this.database = database;
+ this.tableName = tableName;
+ this.distributedEngine = distributedEngine;
+ this.engine = engine;
+ this.engineFull = engineFull;
+ this.createTableDDL = createTableDDL;
+ this.dataPaths = dataPaths;
+ this.tableSchema = tableSchema;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getEngine() {
+ return engine;
+ }
+
+ public void setEngine(String engine) {
+ this.engine = engine;
+ }
+
+ public String getEngineFull() {
+ return engineFull;
+ }
+
+ public void setEngineFull(String engineFull) {
+ this.engineFull = engineFull;
+ }
+
+ public String getCreateTableDDL() {
+ return createTableDDL;
+ }
+
+ public void setCreateTableDDL(String createTableDDL) {
+ this.createTableDDL = createTableDDL;
+ }
+
+ public List<String> getDataPaths() {
+ return dataPaths;
+ }
+
+ public void setDataPaths(List<String> dataPaths) {
+ this.dataPaths = dataPaths;
+ }
+
+ public Map<String, String> getTableSchema() {
+ return tableSchema;
+ }
+
+ public void setTableSchema(Map<String, String> tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ public String getLocalTableName() {
+ if (distributedEngine != null) {
+ return distributedEngine.getTable();
+ } else {
+ return tableName;
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
similarity index 67%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
index 65b7af7c6..ca581f0ea 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
+import java.util.List;
- public static final String DATABASE = "database";
+public interface FileTransfer {
- public static final String SQL = "sql";
+ void init();
- public static final String USERNAME = "username";
+ void transferAndChown(String sourcePath, String targetPath);
- public static final String PASSWORD = "password";
+ void transferAndChown(List<String> sourcePath, String targetPath);
+ void close();
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
new file mode 100644
index 000000000..6fa83794c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.scp.client.ScpClient;
+import org.apache.sshd.scp.client.ScpClientCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScpFileTransfer implements FileTransfer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);
+
+ private static final int SCP_PORT = 22;
+
+ private final String host;
+ private final String password;
+
+ private ScpClient scpClient;
+ private ClientSession clientSession;
+ private SshClient sshClient;
+
+ public ScpFileTransfer(String host, String password) {
+ this.host = host;
+ this.password = password;
+ }
+
+ @Override
+ public void init() {
+ try {
+ sshClient = SshClient.setUpDefaultClient();
+ sshClient.start();
+ clientSession = sshClient.connect("root", host, SCP_PORT).verify().getSession();
+ if (password != null) {
+ clientSession.addPasswordIdentity(password);
+ }
+ // TODO support add publicKey to identity
+ if (!clientSession.auth().verify().isSuccess()) {
+ throw new IOException("ssh host " + host + "authentication failed");
+ }
+ scpClient = ScpClientCreator.instance().createScpClient(clientSession);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to connect to host: " + host + " by user: root on port 22", e);
+ }
+ }
+
+ @Override
+ public void transferAndChown(String sourcePath, String targetPath) {
+ try {
+ scpClient.upload(
+ sourcePath,
+ targetPath,
+ ScpClient.Option.Recursive,
+ ScpClient.Option.TargetIsDirectory,
+ ScpClient.Option.PreserveAttributes);
+ } catch (IOException e) {
+ throw new RuntimeException("Scp failed to transfer file: " + sourcePath + " to: " + targetPath, e);
+ }
+ // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+ // make ATTACH command work.
+ List<String> command = new ArrayList<>();
+ command.add("ls");
+ command.add("-l");
+ command.add(targetPath.substring(0,
+ StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+ command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
+ try {
+ String finalCommand = String.join(" ", command);
+ LOGGER.info("execute remote command: " + finalCommand);
+ clientSession.executeRemoteCommand(finalCommand);
+ } catch (IOException e) {
+ // always return error cause xargs return shell command result
+ }
+ }
+
+ @Override
+ public void transferAndChown(List<String> sourcePaths, String targetPath) {
+ if (sourcePaths == null) {
+ throw new IllegalArgumentException("sourcePath is null");
+ }
+ sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, targetPath));
+ }
+
+ @Override
+ public void close() {
+ if (clientSession != null && clientSession.isOpen()) {
+ try {
+ clientSession.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close ssh session", e);
+ }
+ }
+ if (sshClient != null && sshClient.isOpen()) {
+ sshClient.stop();
+ try {
+ sshClient.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close ssh client", e);
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
similarity index 56%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
index 65b7af7c6..c564e5501 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
- public static final String SQL = "sql";
+public class ArrayInjectFunction implements ClickhouseFieldInjectFunction {
- public static final String USERNAME = "username";
+ private static final Pattern PATTERN = Pattern.compile("(Array.*)");
- public static final String PASSWORD = "password";
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ statement.setArray(index, (java.sql.Array) value);
+ }
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return PATTERN.matcher(fieldType).matches();
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
similarity index 55%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
index 65b7af7c6..25c73ab3f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.regex.Pattern;
- public static final String SQL = "sql";
+public class BigDecimalInjectFunction implements ClickhouseFieldInjectFunction {
- public static final String USERNAME = "username";
+ private static final Pattern PATTERN = Pattern.compile("(Decimal.*)");
- public static final String PASSWORD = "password";
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ statement.setBigDecimal(index, (java.math.BigDecimal) value);
+ }
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return PATTERN.matcher(fieldType).matches();
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
new file mode 100644
index 000000000..3e27a6343
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Injects a field into a ClickHouse statement, used to transform a java type into a ClickHouse type.
+ */
+public interface ClickhouseFieldInjectFunction extends Serializable {
+
+ /**
+ * Inject the value into the statement.
+ *
+ * @param statement statement to inject into
+ * @param value value to inject
+ * @param index index in the statement
+ */
+ void injectFields(PreparedStatement statement, int index, Object value) throws SQLException;
+
+ /**
+ * If the fieldType need to be injected by the current function.
+ *
+ * @param fieldType field type to inject
+ * @return true if the fieldType need to be injected by the current function
+ */
+ boolean isCurrentFieldType(String fieldType);
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
similarity index 54%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
index 65b7af7c6..7a0b0b648 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java
@@ -15,21 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class DateInjectFunction implements ClickhouseFieldInjectFunction {
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ if (value instanceof Date) {
+ statement.setDate(index, (Date) value);
+ } else {
+ statement.setDate(index, Date.valueOf(value.toString()));
+ }
+ }
+
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "Date".equals(fieldType);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
index 65b7af7c6..b85c56afb 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
@@ -15,21 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ if (value instanceof Timestamp) {
+ statement.setTimestamp(index, (Timestamp) value);
+ } else {
+ statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
+ }
+ }
+
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "DateTime".equals(fieldType);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java
new file mode 100644
index 000000000..c416d110c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class DoubleInjectFunction implements ClickhouseFieldInjectFunction {
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ if (value instanceof BigDecimal) {
+ statement.setDouble(index, ((BigDecimal) value).doubleValue());
+ } else {
+ statement.setDouble(index, (Double) value);
+ }
+ }
+
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "UInt32".equals(fieldType)
+ || "UInt64".equals(fieldType)
+ || "Int64".equals(fieldType)
+ || "Float64".equals(fieldType);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
index 65b7af7c6..84464808b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java
@@ -15,21 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
-
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class FloatInjectFunction implements ClickhouseFieldInjectFunction {
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ if (value instanceof BigDecimal) {
+ statement.setFloat(index, ((BigDecimal) value).floatValue());
+ } else {
+ statement.setFloat(index, (Float) value);
+ }
+ }
+
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "Float32".equals(fieldType);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java
new file mode 100644
index 000000000..f6e8c27dc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class IntInjectFunction implements ClickhouseFieldInjectFunction {
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ if (value instanceof Byte) {
+ statement.setByte(index, (Byte) value);
+
+ } else if (value instanceof Short) {
+ statement.setShort(index, (Short) value);
+
+ } else {
+ statement.setInt(index, (Integer) value);
+
+ }
+ }
+
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "Int8".equals(fieldType)
+ || "UInt8".equals(fieldType)
+ || "Int16".equals(fieldType)
+ || "UInt16".equals(fieldType)
+ || "Int32".equals(fieldType);
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
similarity index 57%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
index 65b7af7c6..ccd3e60b8 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
- public static final String USERNAME = "username";
+public class LongInjectFunction implements ClickhouseFieldInjectFunction {
- public static final String PASSWORD = "password";
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ statement.setLong(index, (Long) value);
+ }
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "UInt32".equals(fieldType)
+ || "UInt64".equals(fieldType)
+ || "Int64".equals(fieldType);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
similarity index 61%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
index 65b7af7c6..4894774dc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
- public static final String USERNAME = "username";
+public class StringInjectFunction implements ClickhouseFieldInjectFunction {
- public static final String PASSWORD = "password";
+ @Override
+ public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
+ statement.setString(index, value.toString());
+ }
+ @Override
+ public boolean isCurrentFieldType(String fieldType) {
+ return "String".equals(fieldType);
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index c5166d8b7..a94344e72 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
+import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
@@ -37,22 +37,19 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.clickhouse.client.ClickHouseClient;
-import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
-import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
@AutoService(SeaTunnelSource.class)
public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState> {
@@ -68,17 +65,12 @@ public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, Clickhous
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, NODE_ADDRESS, DATABASE, SQL, USERNAME, PASSWORD);
+ CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, SQL, USERNAME, PASSWORD);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
- servers = Arrays.stream(config.getString(NODE_ADDRESS).split(",")).map(address -> {
- String[] nodeAndPort = address.split(":", 2);
- return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
- Integer.parseInt(nodeAndPort[1])).database(config.getString(DATABASE))
- .credentials(ClickHouseCredentials.fromUserAndPassword(config.getString(USERNAME),
- config.getString(PASSWORD))).build();
- }).collect(Collectors.toList());
+ servers = ClickhouseUtil.createNodes(config.getString(HOST), config.getString(DATABASE),
+ config.getString(USERNAME), config.getString(PASSWORD));
sql = config.getString(SQL);
try (ClickHouseClient client = ClickHouseClient.newInstance(servers.get(0).getProtocol());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
index ce07b3412..66d0621df 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java
@@ -33,6 +33,7 @@ public class ClickhouseSourceSplitEnumerator implements
private final Set<Integer> readers;
private volatile int assigned = -1;
+ // TODO support read distributed engine use multi split
ClickhouseSourceSplitEnumerator(Context<ClickhouseSourceSplit> enumeratorContext) {
this.context = enumeratorContext;
this.readers = new HashSet<>();
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
index 65b7af7c6..2de15ac9c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java
@@ -15,21 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
+import java.io.Serializable;
+public class CKAggCommitInfo implements Serializable {
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
index 65b7af7c6..99464801d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java
@@ -15,21 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
+import java.io.Serializable;
+public class CKCommitInfo implements Serializable {
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
similarity index 66%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
index 65b7af7c6..28d9dc2ed 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java
@@ -15,21 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String DATABASE = "database";
-
- public static final String SQL = "sql";
-
- public static final String USERNAME = "username";
-
- public static final String PASSWORD = "password";
+import java.io.Serializable;
+public class ClickhouseSinkState implements Serializable {
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
similarity index 67%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
index 65b7af7c6..02e7be596 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool;
-/**
- * The config of clickhouse
- */
-public class Config {
-
- public static final String NODE_ADDRESS = "node_address";
+import java.io.Serializable;
- public static final String DATABASE = "database";
+public class IntHolder implements Serializable {
- public static final String SQL = "sql";
+ private static final long serialVersionUID = -1L;
- public static final String USERNAME = "username";
+ private int value;
- public static final String PASSWORD = "password";
+ public int getValue() {
+ return value;
+ }
+ public void setValue(int value) {
+ this.value = value;
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
new file mode 100644
index 000000000..38c835831
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ClickhouseUtil {
+
+ public static List<ClickHouseNode> createNodes(String nodeAddress, String database, String username,
+ String password) {
+ return Arrays.stream(nodeAddress.split(",")).map(address -> {
+ String[] nodeAndPort = address.split(":", 2);
+ return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+ Integer.parseInt(nodeAndPort[1])).database(database)
+ .credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build();
+ }).collect(Collectors.toList());
+ }
+
+}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 091b8ca9a..dd1a4432c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -354,7 +354,6 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons Email (org.apache.commons:commons-email:1.5 - http://commons.apache.org/proper/commons-email/)
(Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.11.0 - https://commons.apache.org/proper/commons-io/)
(Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.5 - http://commons.apache.org/proper/commons-io/)
- (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.8.0 - https://commons.apache.org/proper/commons-io/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.5 - http://commons.apache.org/proper/commons-lang/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.6 - http://commons.apache.org/proper/commons-lang/)
@@ -421,6 +420,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.6 - http://hc.apache.org/httpcomponents-client)
(Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.9 - http://hc.apache.org/httpcomponents-client)
(Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.2 - http://hc.apache.org/httpcomponents-client)
+ (Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.13 - http://hc.apache.org/httpcomponents-client)
(Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.10 - http://hc.apache.org/httpcomponents-core-ga)
(Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.11 - http://hc.apache.org/httpcomponents-core-ga)
(Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.12 - http://hc.apache.org/httpcomponents-core-ga)
@@ -760,6 +760,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client:1.26.0 - https://github.com/googleapis/google-http-java-client/google-http-client)
(The Apache Software License, Version 2.0) Google OAuth Client Library for Java (com.google.oauth-client:google-oauth-client:1.26.0 - https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
(The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
+ (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.9.0 - http://code.google.com/p/google-gson/)
(The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
(The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
(The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
@@ -833,7 +834,9 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.5.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) cli (org.elasticsearch:elasticsearch-cli:6.3.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) clickhouse-client (com.clickhouse:clickhouse-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
+ (The Apache Software License, Version 2.0) clickhouse-grpc-client (com.clickhouse:clickhouse-grpc-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
(The Apache Software License, Version 2.0) clickhouse-http-client (com.clickhouse:clickhouse-http-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
+ (The Apache Software License, Version 2.0) clickhouse-jdbc (com.clickhouse:clickhouse-jdbc:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc)
(The Apache Software License, Version 2.0) clickhouse-jdbc (ru.yandex.clickhouse:clickhouse-jdbc:0.2 - https://github.com/yandex/clickhouse-jdbc)
(The Apache Software License, Version 2.0) elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.5.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) elasticsearch-core (org.elasticsearch:elasticsearch-core:6.3.1 - https://github.com/elastic/elasticsearch)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index 4f0186eb7..fe3c949f6 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4377,4 +4377,16 @@ Copyright 2017-2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+=========================================================================
+
+Apache HttpClient Mime NOTICE
+
+=========================================================================
+
+Apache HttpClient Mime
+Copyright 1999-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
=========================================================================
\ No newline at end of file
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index cc8427d9d..edbd78812 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -62,14 +62,14 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
return pluginIdentifiers.stream()
.map(this::getPluginJarPath)
.filter(Optional::isPresent)
- .map(Optional::get)
+ .map(Optional::get).distinct()
.collect(Collectors.toList());
}
@Override
public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
- .map(this::getPluginInstance)
+ .map(this::getPluginInstance).distinct()
.collect(Collectors.toList());
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 09ab6d239..bcd3d7887 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.MutableLong;
import org.apache.spark.sql.catalyst.expressions.MutableShort;
import org.apache.spark.sql.catalyst.expressions.MutableValue;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.sql.Date;
@@ -77,6 +78,8 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
return Timestamp.valueOf((LocalDateTime) field);
case MAP:
return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::convert);
+ case STRING:
+ return UTF8String.fromString((String) field);
default:
return field;
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index ec31649e7..6674db544 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -78,6 +78,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
}
SparkWriterCommitMessage<CommitInfoT> sparkWriterCommitMessage = new SparkWriterCommitMessage<>(latestCommitInfoT);
cleanCommitInfo();
+ sinkWriter.close();
return sparkWriterCommitMessage;
}
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index c3327b428..5ad2f9439 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -65,8 +65,11 @@ chill-java-0.9.3.jar
chill_2.11-0.9.3.jar
classmate-1.1.0.jar
clickhouse-client-0.3.2-patch9.jar
+clickhouse-grpc-client-0.3.2-patch9-netty.jar
+clickhouse-http-client-0.3.2-patch9-shaded.jar
clickhouse-http-client-0.3.2-patch9.jar
clickhouse-jdbc-0.2.jar
+clickhouse-jdbc-0.3.2-patch9.jar
commons-beanutils-1.9.3.jar
commons-cli-1.2.jar
commons-cli-1.3.1.jar
@@ -186,6 +189,7 @@ google-http-client-1.26.0.jar
google-http-client-jackson2-1.26.0.jar
google-oauth-client-1.26.0.jar
gson-2.2.4.jar
+gson-2.9.0.jar
guava-19.0.jar
guice-3.0.jar
guice-4.1.0.jar
@@ -264,6 +268,7 @@ httpasyncclient-4.1.4.jar
httpclient-4.5.13.jar
httpcore-4.4.4.jar
httpcore-nio-4.4.4.jar
+httpmime-4.5.13.jar
httpmime-4.5.2.jar
hudi-spark-bundle_2.11-0.10.0.jar
i18n-util-1.0.4.jar