You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2023/01/09 03:05:45 UTC
[incubator-seatunnel] branch dev updated: [Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 74704c362 [Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
74704c362 is described below
commit 74704c362ac11e8a620927c3ce7b2c972f3b2e96
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jan 9 11:05:41 2023 +0800
[Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
---
docs/en/connector-v2/sink/Clickhouse.md | 9 ++----
.../clickhouse/config/ClickhouseConfig.java | 6 ----
.../seatunnel/clickhouse/config/ReaderOption.java | 2 --
.../clickhouse/sink/ClickhouseSinkFactory.java | 2 --
.../clickhouse/sink/client/ClickhouseSink.java | 16 -----------
.../sink/client/ClickhouseSinkWriter.java | 1 -
.../executor/BufferedBatchStatementExecutor.java | 4 +--
.../JdbcBatchStatementExecutorBuilder.java | 18 +++++-------
.../clickhouse/sink/client/executor/SqlUtils.java | 2 +-
.../clickhouse/sink/file/ClickhouseFileSink.java | 15 +---------
.../sink/file/ClickhouseFileSinkFactory.java | 3 +-
.../seatunnel/clickhouse/ClickhouseIT.java | 5 +---
.../test/resources/clickhouse_to_clickhouse.conf | 32 ----------------------
13 files changed, 15 insertions(+), 100 deletions(-)
diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md
index d82b0417a..f01ccd6c7 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -29,7 +29,6 @@ Write data to Clickhouse can also be done using JDBC
| table | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
-| fields | string | yes | - |
| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
@@ -59,10 +58,6 @@ The table name
`ClickHouse` user password
-### fields [array]
-
-The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` .
-
### clickhouse.config [map]
In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` .
@@ -188,4 +183,6 @@ sink {
- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
-- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
+
+- [Improve] Remove Clickhouse Fields Config ([3826](https://github.com/apache/incubator-seatunnel/pull/3826))
+- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index ecaef394a..d9184c1cf 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -33,12 +33,6 @@ public class ClickhouseConfig {
public static final Option<Integer> BULK_SIZE = Options.key("bulk_size").intType()
.defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");
- /**
- * Clickhouse fields
- */
- public static final Option<String> FIELDS = Options.key("fields").stringType()
- .noDefaultValue().withDescription("Clickhouse fields");
-
public static final Option<String> SQL = Options.key("sql").stringType()
.noDefaultValue().withDescription("Clickhouse sql used to query data");
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
index 59f711741..ac8a80f06 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -25,7 +25,6 @@ import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,7 +33,6 @@ import java.util.Properties;
public class ReaderOption implements Serializable {
private ShardMetadata shardMetadata;
- private List<String> fields;
private String[] primaryKeys;
private boolean allowExperimentalLightweightDelete;
private boolean supportUpsert;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 0e343a7d8..fe6857650 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
@@ -51,7 +50,6 @@ public class ClickhouseSinkFactory implements TableSinkFactory {
.optional(CLICKHOUSE_CONFIG,
BULK_SIZE,
SPLIT_MODE,
- FIELDS,
SHARDING_KEY,
PRIMARY_KEY,
SUPPORT_UPSERT,
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 13f0c49d1..c95b0112f 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
@@ -45,7 +44,6 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
@@ -63,7 +61,6 @@ import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -161,18 +158,6 @@ public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSin
new Shard(1, 1, nodes.get(0)));
}
- List<String> fields = new ArrayList<>();
- if (config.hasPath(FIELDS.key())) {
- fields.addAll(config.getStringList(FIELDS.key()));
- // check if the fields exist in schema
- for (String field : fields) {
- if (!tableSchema.containsKey(field)) {
- throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
- }
- }
- } else {
- fields.addAll(tableSchema.keySet());
- }
proxy.close();
String[] primaryKeys = null;
@@ -195,7 +180,6 @@ public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSin
this.option = ReaderOption.builder()
.shardMetadata(metadata)
.properties(clickhouseProperties)
- .fields(fields)
.tableEngine(table.getEngine())
.tableSchema(tableSchema)
.bulkSize(config.getInt(BULK_SIZE.key()))
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index deb015453..f1f7fa531 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -135,7 +135,6 @@ public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitIn
.setRowType(option.getSeaTunnelRowType())
.setPrimaryKeys(option.getPrimaryKeys())
.setClickhouseTableSchema(option.getTableSchema())
- .setProjectionFields(option.getFields().toArray(new String[0]))
.setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
.setSupportUpsert(option.isSupportUpsert())
.build();
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
index 7b5a4d249..2f10565c3 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
@@ -63,8 +63,6 @@ public class BufferedBatchStatementExecutor implements JdbcBatchStatementExecuto
if (!buffer.isEmpty()) {
executeBatch();
}
- if (statementExecutor != null) {
- statementExecutor.closeStatements();
- }
+ statementExecutor.closeStatements();
}
}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
index cf2508841..a13681d3a 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
@@ -40,7 +40,6 @@ public class JdbcBatchStatementExecutorBuilder {
private String tableEngine;
private SeaTunnelRowType rowType;
private String[] primaryKeys;
- private String[] projectionFields;
private Map<String, String> clickhouseTableSchema;
private boolean supportUpsert;
private boolean allowExperimentalLightweightDelete;
@@ -53,15 +52,15 @@ public class JdbcBatchStatementExecutorBuilder {
private boolean supportReplacingMergeTreeTableUpsert() {
return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
- && Objects.equals(primaryKeys, orderByKeys);
+ && Arrays.equals(primaryKeys, orderByKeys);
}
private String[] getDefaultProjectionFields() {
List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
return clickhouseTableSchema.keySet()
.stream()
- .filter(field -> fieldNames.contains(field))
- .toArray(value -> new String[0]);
+ .filter(fieldNames::contains)
+ .toArray(String[]::new);
}
public JdbcBatchStatementExecutor build() {
@@ -69,12 +68,9 @@ public class JdbcBatchStatementExecutorBuilder {
Objects.requireNonNull(tableEngine);
Objects.requireNonNull(rowType);
Objects.requireNonNull(clickhouseTableSchema);
- if (projectionFields == null) {
- projectionFields = getDefaultProjectionFields();
- }
JdbcRowConverter valueRowConverter = new JdbcRowConverter(
- rowType, clickhouseTableSchema, projectionFields);
+ rowType, clickhouseTableSchema, getDefaultProjectionFields());
if (primaryKeys == null || primaryKeys.length == 0) {
// INSERT: writer all events when primary-keys is empty
return createInsertBufferedExecutor(table, rowType, valueRowConverter);
@@ -192,8 +188,8 @@ public class JdbcBatchStatementExecutorBuilder {
private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, SeaTunnelRowType rowType) {
return Arrays.stream(pkFields)
- .mapToObj((IntFunction<SeaTunnelDataType>) index -> rowType.getFieldType(index))
- .toArray(length -> new SeaTunnelDataType[length]);
+ .mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
+ .toArray(SeaTunnelDataType[]::new);
}
private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
@@ -205,7 +201,7 @@ public class JdbcBatchStatementExecutorBuilder {
SeaTunnelRow newRow = new SeaTunnelRow(fields);
newRow.setTableId(row.getTableId());
newRow.setRowKind(row.getRowKind());
- return row;
+ return newRow;
};
}
}
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
index 448eaca2a..0360bc72f 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
@@ -30,7 +30,7 @@ public class SqlUtils {
public static String getInsertIntoStatement(String tableName,
String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
- .map(fieldName -> quoteIdentifier(fieldName))
+ .map(SqlUtils::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(fieldName -> "?")
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 1da905450..192ad6815 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,7 +47,6 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
@@ -120,18 +118,7 @@ public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, Clickhous
table.getEngine(),
false, // we don't need to set splitMode in clickhouse file mode.
new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
- List<String> fields;
- if (config.hasPath(FIELDS.key())) {
- fields = config.getStringList(FIELDS.key());
- // check if the fields exist in schema
- for (String field : fields) {
- if (!tableSchema.containsKey(field)) {
- throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
- }
- }
- } else {
- fields = new ArrayList<>(tableSchema.keySet());
- }
+ List<String> fields = new ArrayList<>(tableSchema.keySet());
Map<String, String> nodeUser = config.getObjectList(NODE_PASS.key()).stream()
.collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
configObject -> configObject.toConfig().hasPath(USERNAME.key()) ? configObject.toConfig().getString(USERNAME.key()) : "root"));
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
index 829d8e005..fc5b2ecca 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,6 +47,6 @@ public class ClickhouseFileSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH)
- .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
+ .optional(COPY_METHOD, SHARDING_KEY, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 05ca6d359..c4baa7871 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -368,10 +368,7 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource {
private Boolean compare(String sql) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
- while (resultSet.next()) {
- return false;
- }
- return true;
+ return !resultSet.next();
} catch (SQLException e) {
throw new RuntimeException("result compare error", e);
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
index 41ecae569..1137c7c93 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
@@ -43,38 +43,6 @@ sink {
host = "clickhouse:8123"
database = "default"
table = "sink_table"
- fields = [
- "id",
- "c_map",
- "c_array_string",
- "c_array_short",
- "c_array_int",
- "c_array_long",
- "c_array_float",
- "c_array_double",
- "c_string",
- "c_boolean",
- "c_int8",
- "c_int16",
- "c_int32",
- "c_int64",
- "c_float32",
- "c_float64",
- "c_decimal",
- "c_date",
- "c_datetime",
- "c_nullable",
- "c_lowcardinality",
- "c_nested.int",
- "c_nested.double",
- "c_nested.string",
- "c_int128",
- "c_uint128",
- "c_int256",
- "c_uint256",
- "c_point",
- "c_ring"
- ]
username = "default"
password = ""
}