You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2023/01/09 03:05:45 UTC

[incubator-seatunnel] branch dev updated: [Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 74704c362 [Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
74704c362 is described below

commit 74704c362ac11e8a620927c3ce7b2c972f3b2e96
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jan 9 11:05:41 2023 +0800

    [Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
---
 docs/en/connector-v2/sink/Clickhouse.md            |  9 ++----
 .../clickhouse/config/ClickhouseConfig.java        |  6 ----
 .../seatunnel/clickhouse/config/ReaderOption.java  |  2 --
 .../clickhouse/sink/ClickhouseSinkFactory.java     |  2 --
 .../clickhouse/sink/client/ClickhouseSink.java     | 16 -----------
 .../sink/client/ClickhouseSinkWriter.java          |  1 -
 .../executor/BufferedBatchStatementExecutor.java   |  4 +--
 .../JdbcBatchStatementExecutorBuilder.java         | 18 +++++-------
 .../clickhouse/sink/client/executor/SqlUtils.java  |  2 +-
 .../clickhouse/sink/file/ClickhouseFileSink.java   | 15 +---------
 .../sink/file/ClickhouseFileSinkFactory.java       |  3 +-
 .../seatunnel/clickhouse/ClickhouseIT.java         |  5 +---
 .../test/resources/clickhouse_to_clickhouse.conf   | 32 ----------------------
 13 files changed, 15 insertions(+), 100 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md
index d82b0417a..f01ccd6c7 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -29,7 +29,6 @@ Write data to Clickhouse can also be done using JDBC
 | table                                 | string  | yes      | -             |
 | username                              | string  | yes      | -             |
 | password                              | string  | yes      | -             |
-| fields                                | string  | yes      | -             |
 | clickhouse.config                     | map     | no       |               |
 | bulk_size                             | string  | no       | 20000         |
 | split_mode                            | string  | no       | false         |
@@ -59,10 +58,6 @@ The table name
 
 `ClickHouse` user password
 
-### fields [array]
-
-The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` .
-
 ### clickhouse.config [map]
 
 In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` .
@@ -188,4 +183,6 @@ sink {
 - [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
 - [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
 - [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
-- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
+
+- [Improve] Remove Clickhouse Fields Config ([3826](https://github.com/apache/incubator-seatunnel/pull/3826))
+- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index ecaef394a..d9184c1cf 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -33,12 +33,6 @@ public class ClickhouseConfig {
     public static final Option<Integer> BULK_SIZE = Options.key("bulk_size").intType()
         .defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");
 
-    /**
-     * Clickhouse fields
-     */
-    public static final Option<String> FIELDS = Options.key("fields").stringType()
-        .noDefaultValue().withDescription("Clickhouse fields");
-
     public static final Option<String> SQL = Options.key("sql").stringType()
         .noDefaultValue().withDescription("Clickhouse sql used to query data");
 
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
index 59f711741..ac8a80f06 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -25,7 +25,6 @@ import lombok.Getter;
 import lombok.Setter;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -34,7 +33,6 @@ import java.util.Properties;
 public class ReaderOption implements Serializable {
 
     private ShardMetadata shardMetadata;
-    private List<String> fields;
     private String[] primaryKeys;
     private boolean allowExperimentalLightweightDelete;
     private boolean supportUpsert;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 0e343a7d8..fe6857650 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
@@ -51,7 +50,6 @@ public class ClickhouseSinkFactory implements TableSinkFactory {
             .optional(CLICKHOUSE_CONFIG,
                 BULK_SIZE,
                 SPLIT_MODE,
-                FIELDS,
                 SHARDING_KEY,
                 PRIMARY_KEY,
                 SUPPORT_UPSERT,
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 13f0c49d1..c95b0112f 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
@@ -45,7 +44,6 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
@@ -63,7 +61,6 @@ import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -161,18 +158,6 @@ public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSin
                 new Shard(1, 1, nodes.get(0)));
         }
 
-        List<String> fields = new ArrayList<>();
-        if (config.hasPath(FIELDS.key())) {
-            fields.addAll(config.getStringList(FIELDS.key()));
-            // check if the fields exist in schema
-            for (String field : fields) {
-                if (!tableSchema.containsKey(field)) {
-                    throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
-                }
-            }
-        } else {
-            fields.addAll(tableSchema.keySet());
-        }
         proxy.close();
 
         String[] primaryKeys = null;
@@ -195,7 +180,6 @@ public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         this.option = ReaderOption.builder()
             .shardMetadata(metadata)
             .properties(clickhouseProperties)
-            .fields(fields)
             .tableEngine(table.getEngine())
             .tableSchema(tableSchema)
             .bulkSize(config.getInt(BULK_SIZE.key()))
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index deb015453..f1f7fa531 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -135,7 +135,6 @@ public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitIn
                     .setRowType(option.getSeaTunnelRowType())
                     .setPrimaryKeys(option.getPrimaryKeys())
                     .setClickhouseTableSchema(option.getTableSchema())
-                    .setProjectionFields(option.getFields().toArray(new String[0]))
                     .setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
                     .setSupportUpsert(option.isSupportUpsert())
                     .build();
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
index 7b5a4d249..2f10565c3 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
@@ -63,8 +63,6 @@ public class BufferedBatchStatementExecutor implements JdbcBatchStatementExecuto
         if (!buffer.isEmpty()) {
             executeBatch();
         }
-        if (statementExecutor != null) {
-            statementExecutor.closeStatements();
-        }
+        statementExecutor.closeStatements();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
index cf2508841..a13681d3a 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
@@ -40,7 +40,6 @@ public class JdbcBatchStatementExecutorBuilder {
     private String tableEngine;
     private SeaTunnelRowType rowType;
     private String[] primaryKeys;
-    private String[] projectionFields;
     private Map<String, String> clickhouseTableSchema;
     private boolean supportUpsert;
     private boolean allowExperimentalLightweightDelete;
@@ -53,15 +52,15 @@ public class JdbcBatchStatementExecutorBuilder {
 
     private boolean supportReplacingMergeTreeTableUpsert() {
         return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
-            && Objects.equals(primaryKeys, orderByKeys);
+            && Arrays.equals(primaryKeys, orderByKeys);
     }
 
     private String[] getDefaultProjectionFields() {
         List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
         return clickhouseTableSchema.keySet()
             .stream()
-            .filter(field -> fieldNames.contains(field))
-            .toArray(value -> new String[0]);
+            .filter(fieldNames::contains)
+            .toArray(String[]::new);
     }
 
     public JdbcBatchStatementExecutor build() {
@@ -69,12 +68,9 @@ public class JdbcBatchStatementExecutorBuilder {
         Objects.requireNonNull(tableEngine);
         Objects.requireNonNull(rowType);
         Objects.requireNonNull(clickhouseTableSchema);
-        if (projectionFields == null) {
-            projectionFields = getDefaultProjectionFields();
-        }
 
         JdbcRowConverter valueRowConverter = new JdbcRowConverter(
-            rowType, clickhouseTableSchema, projectionFields);
+            rowType, clickhouseTableSchema, getDefaultProjectionFields());
         if (primaryKeys == null || primaryKeys.length == 0) {
             // INSERT: writer all events when primary-keys is empty
             return createInsertBufferedExecutor(table, rowType, valueRowConverter);
@@ -192,8 +188,8 @@ public class JdbcBatchStatementExecutorBuilder {
 
     private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, SeaTunnelRowType rowType) {
         return Arrays.stream(pkFields)
-            .mapToObj((IntFunction<SeaTunnelDataType>) index -> rowType.getFieldType(index))
-            .toArray(length -> new SeaTunnelDataType[length]);
+            .mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
+            .toArray(SeaTunnelDataType[]::new);
     }
 
     private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
@@ -205,7 +201,7 @@ public class JdbcBatchStatementExecutorBuilder {
             SeaTunnelRow newRow = new SeaTunnelRow(fields);
             newRow.setTableId(row.getTableId());
             newRow.setRowKind(row.getRowKind());
-            return row;
+            return newRow;
         };
     }
 }
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
index 448eaca2a..0360bc72f 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
@@ -30,7 +30,7 @@ public class SqlUtils {
     public static String getInsertIntoStatement(String tableName,
                                                 String[] fieldNames) {
         String columns = Arrays.stream(fieldNames)
-            .map(fieldName -> quoteIdentifier(fieldName))
+            .map(SqlUtils::quoteIdentifier)
             .collect(Collectors.joining(", "));
         String placeholders = Arrays.stream(fieldNames)
             .map(fieldName -> "?")
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 1da905450..192ad6815 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,7 +47,6 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
@@ -120,18 +118,7 @@ public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, Clickhous
             table.getEngine(),
             false, // we don't need to set splitMode in clickhouse file mode.
             new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
-        List<String> fields;
-        if (config.hasPath(FIELDS.key())) {
-            fields = config.getStringList(FIELDS.key());
-            // check if the fields exist in schema
-            for (String field : fields) {
-                if (!tableSchema.containsKey(field)) {
-                    throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
-                }
-            }
-        } else {
-            fields = new ArrayList<>(tableSchema.keySet());
-        }
+        List<String> fields = new ArrayList<>(tableSchema.keySet());
         Map<String, String> nodeUser = config.getObjectList(NODE_PASS.key()).stream()
             .collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
                 configObject -> configObject.toConfig().hasPath(USERNAME.key()) ? configObject.toConfig().getString(USERNAME.key()) : "root"));
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
index 829d8e005..fc5b2ecca 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,6 +47,6 @@ public class ClickhouseFileSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH)
-            .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
+            .optional(COPY_METHOD, SHARDING_KEY, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
     }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 05ca6d359..c4baa7871 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -368,10 +368,7 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource {
     private Boolean compare(String sql) {
         try (Statement statement = connection.createStatement()) {
             ResultSet resultSet = statement.executeQuery(sql);
-            while (resultSet.next()) {
-                return false;
-            }
-            return true;
+            return !resultSet.next();
         } catch (SQLException e) {
             throw new RuntimeException("result compare error", e);
         }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
index 41ecae569..1137c7c93 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
@@ -43,38 +43,6 @@ sink {
     host = "clickhouse:8123"
     database = "default"
     table = "sink_table"
-    fields = [
-    "id",
-    "c_map",
-    "c_array_string",
-    "c_array_short",
-    "c_array_int",
-    "c_array_long",
-    "c_array_float",
-    "c_array_double",
-    "c_string",
-    "c_boolean",
-    "c_int8",
-    "c_int16",
-    "c_int32",
-    "c_int64",
-    "c_float32",
-    "c_float64",
-    "c_decimal",
-    "c_date",
-    "c_datetime",
-    "c_nullable",
-    "c_lowcardinality",
-    "c_nested.int",
-    "c_nested.double",
-    "c_nested.string",
-    "c_int128",
-    "c_uint128",
-    "c_int256",
-    "c_uint256",
-    "c_point",
-    "c_ring"
-    ]
     username = "default"
     password = ""
    }