You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 06:38:53 UTC

[incubator-inlong] branch master updated: [INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e96a4  [INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)
d4e96a4 is described below

commit d4e96a44623312768dcbfd5513dc0ebacdba1609
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Mar 10 14:38:24 2022 +0800

    [INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)
---
 .../client/api/{StreamSink.java => SinkField.java} |  31 +++---
 .../inlong/manager/client/api/StreamSink.java      |   5 +-
 .../manager/client/api/sink/ClickHouseSink.java    |   6 +-
 .../inlong/manager/client/api/sink/HiveSink.java   |  46 ++++----
 .../inlong/manager/client/api/sink/KafkaSink.java  |   4 +-
 .../client/api/util/InlongStreamSinkTransfer.java  | 116 +++++++++------------
 .../pojo/sink/ck/ClickHouseSinkResponse.java       |  19 +---
 7 files changed, 96 insertions(+), 131 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
similarity index 60%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index 9646de1..7a0b3e3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -19,21 +19,24 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
+import lombok.NoArgsConstructor;
 
 @Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
-
-    @ApiModelProperty(value = "DataSink name", required = true)
-    private String sinkName;
-
-    public abstract SinkType getSinkType();
-
-    public abstract List<StreamField> getStreamFields();
-
-    public abstract DataFormat getDataFormat();
-
+@NoArgsConstructor
+@ApiModel("Sink field configuration")
+public class SinkField extends StreamField {
+
+    @ApiModelProperty("Source field name")
+    private String sourceFieldName;
+
+    @ApiModelProperty("Source field type")
+    private String sourceFieldType;
+
+    public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
+            String fieldValue, String sourceFieldName, String sourceFieldType) {
+        super(index, fieldType, fieldName, fieldComment, fieldValue);
+        this.sourceFieldName = sourceFieldName;
+        this.sourceFieldType = sourceFieldType;
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 9646de1..df6f610 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -19,10 +19,11 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.SinkType;
 
+import java.util.List;
+
 @Data
 @ApiModel("Stream sink configuration")
 public abstract class StreamSink {
@@ -32,7 +33,7 @@ public abstract class StreamSink {
 
     public abstract SinkType getSinkType();
 
-    public abstract List<StreamField> getStreamFields();
+    public abstract List<SinkField> getSinkFields();
 
     public abstract DataFormat getDataFormat();
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
index dcb3477..1bd24ad 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
@@ -24,7 +24,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.SinkType;
@@ -78,8 +78,8 @@ public class ClickHouseSink extends StreamSink {
     @ApiModelProperty("Create topic or not")
     private boolean needCreated;
 
-    @ApiModelProperty("Field definitions for kafka")
-    private List<StreamField> streamFields;
+    @ApiModelProperty("Field definitions for clickhouse")
+    private List<SinkField> sinkFields;
 
     @ApiModelProperty("Other properties if need")
     private Map<String, String> properties;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index f93adfb..d9c1dde 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -19,21 +19,22 @@ package org.apache.inlong.manager.client.api.sink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.SinkType;
 
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
 @Data
 @EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
@@ -67,39 +68,32 @@ public class HiveSink extends StreamSink {
 
     @ApiModelProperty("Data separator, stored as ASCII code")
     private DataSeparator dataSeparator = DataSeparator.SOH;
-
-    public enum FileFormat {
-        TextFile, RCFile, SequenceFile, Avro;
-
-        public static FileFormat forName(String name) {
-            for (FileFormat value : values()) {
-                if (value.name().equals(name)) {
-                    return value;
-                }
-            }
-            throw new IllegalArgumentException(String.format("Unsupport FileFormat:%s", name));
-        }
-    }
-
     @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
     private FileFormat fileFormat;
-
     @ApiModelProperty("Create table or not")
     private boolean needCreated;
-
     @ApiModelProperty("Primary partition field, default null")
     private String primaryPartition;
-
     @ApiModelProperty("Secondary partition field, default null")
     private String secondaryPartition;
-
     @ApiModelProperty("Field definitions for hive")
-    private List<StreamField> streamFields;
-
+    private List<SinkField> sinkFields;
     @ApiModelProperty("Other properties if need")
     private Map<String, String> properties;
-
     @ApiModelProperty("Data format type for stream sink")
     private DataFormat dataFormat;
+
+    public enum FileFormat {
+        TextFile, RCFile, SequenceFile, Avro;
+
+        public static FileFormat forName(String name) {
+            for (FileFormat value : values()) {
+                if (value.name().equals(name)) {
+                    return value;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupport FileFormat:%s", name));
+        }
+    }
 }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
index fd83948..935f383 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
@@ -24,7 +24,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.common.enums.SinkType;
 
@@ -54,7 +54,7 @@ public class KafkaSink extends StreamSink {
     private boolean needCreated;
 
     @ApiModelProperty("Field definitions for kafka")
-    private List<StreamField> streamFields;
+    private List<SinkField> sinkFields;
 
     @ApiModelProperty("Other properties if need")
     private Map<String, String> properties;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 496daab..8755dfe 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamField.FieldType;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.client.api.sink.HiveSink.FileFormat;
 import org.apache.inlong.manager.client.api.sink.KafkaSink;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkRequest;
@@ -39,7 +40,6 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
@@ -108,52 +108,54 @@ public class InlongStreamSinkTransfer {
         clickHouseSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
-        if (CollectionUtils.isNotEmpty(clickHouseSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
-                    streamInfo.getFieldList());
+        if (CollectionUtils.isNotEmpty(clickHouseSink.getSinkFields())) {
+            List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
             clickHouseSinkRequest.setFieldList(fieldRequests);
         }
         return clickHouseSinkRequest;
     }
 
-    private static StreamSink parseClickHouseSink(ClickHouseSinkResponse clickHouseSinkResponse,
+    private static StreamSink parseClickHouseSink(ClickHouseSinkResponse sinkResponse,
             StreamSink streamSink) {
         ClickHouseSink clickHouseSink = new ClickHouseSink();
         if (streamSink != null) {
-            AssertUtil.isTrue(clickHouseSinkResponse.getSinkName().equals(streamSink.getSinkName()),
-                    String.format("SinkName is not equal: %s != %s", clickHouseSinkResponse, streamSink));
+            AssertUtil.isTrue(sinkResponse.getSinkName().equals(streamSink.getSinkName()),
+                    String.format("SinkName is not equal: %s != %s", sinkResponse, streamSink));
             ClickHouseSink snapshot = (ClickHouseSink) streamSink;
             clickHouseSink = CommonBeanUtils.copyProperties(snapshot, ClickHouseSink::new);
         } else {
-            clickHouseSink.setDistributedTable(clickHouseSinkResponse.getDistributedTable());
-            clickHouseSink.setSinkName(clickHouseSinkResponse.getSinkName());
-            clickHouseSink.setFlushInterval(clickHouseSinkResponse.getFlushInterval());
-            clickHouseSink.setAuthentication(new DefaultAuthentication(clickHouseSinkResponse.getSinkName(),
-                    clickHouseSinkResponse.getPassword()));
-            clickHouseSink.setDatabaseName(clickHouseSinkResponse.getDatabaseName());
-            clickHouseSink.setFlushRecordNumber(clickHouseSinkResponse.getFlushRecordNumber());
-            clickHouseSink.setJdbcUrl(clickHouseSinkResponse.getJdbcUrl());
-            clickHouseSink.setPartitionKey(clickHouseSinkResponse.getPartitionKey());
-            clickHouseSink.setKeyFieldNames(clickHouseSinkResponse.getKeyFieldNames());
-            clickHouseSink.setPartitionStrategy(clickHouseSinkResponse.getPartitionStrategy());
-            clickHouseSink.setWriteMaxRetryTimes(clickHouseSinkResponse.getWriteMaxRetryTimes());
-            clickHouseSink.setDistributedTable(clickHouseSinkResponse.getDistributedTable());
+            clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
+            clickHouseSink.setSinkName(sinkResponse.getSinkName());
+            clickHouseSink.setFlushInterval(sinkResponse.getFlushInterval());
+            clickHouseSink.setAuthentication(new DefaultAuthentication(sinkResponse.getSinkName(),
+                    sinkResponse.getPassword()));
+            clickHouseSink.setDatabaseName(sinkResponse.getDatabaseName());
+            clickHouseSink.setFlushRecordNumber(sinkResponse.getFlushRecordNumber());
+            clickHouseSink.setJdbcUrl(sinkResponse.getJdbcUrl());
+            clickHouseSink.setPartitionKey(sinkResponse.getPartitionKey());
+            clickHouseSink.setKeyFieldNames(sinkResponse.getKeyFieldNames());
+            clickHouseSink.setPartitionStrategy(sinkResponse.getPartitionStrategy());
+            clickHouseSink.setWriteMaxRetryTimes(sinkResponse.getWriteMaxRetryTimes());
+            clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
         }
 
-        clickHouseSink.setNeedCreated(clickHouseSinkResponse.getEnableCreateResource() == 1);
-        List<StreamField> fieldList = clickHouseSinkResponse.getFieldList()
-                .stream()
-                .map(storageFieldRequest -> {
-                    return new StreamField(storageFieldRequest.getId(),
-                            FieldType.forName(storageFieldRequest.getFieldType()),
-                            storageFieldRequest.getFieldName(),
-                            storageFieldRequest.getFieldComment(),
-                            null);
-                }).collect(Collectors.toList());
-        clickHouseSink.setStreamFields(fieldList);
+        clickHouseSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
+        if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+            clickHouseSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+        }
         return clickHouseSink;
     }
 
+    private static List<SinkField> convertToSinkFields(List<SinkFieldResponse> sinkFieldResponses) {
+        return sinkFieldResponses.stream().map(sinkFieldResponse -> new SinkField(sinkFieldResponse.getId(),
+                FieldType.forName(sinkFieldResponse.getFieldType()),
+                sinkFieldResponse.getFieldName(),
+                sinkFieldResponse.getFieldComment(),
+                null, sinkFieldResponse.getSourceFieldName(),
+                sinkFieldResponse.getSourceFieldType())).collect(Collectors.toList());
+
+    }
+
     private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
         KafkaSinkRequest kafkaSinkRequest = new KafkaSinkRequest();
         KafkaSink kafkaSink = (KafkaSink) streamSink;
@@ -165,9 +167,8 @@ public class InlongStreamSinkTransfer {
         kafkaSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
         kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
-        if (CollectionUtils.isNotEmpty(kafkaSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = createSinkFields(kafkaSink.getStreamFields(),
-                    streamInfo.getFieldList());
+        if (CollectionUtils.isNotEmpty(kafkaSink.getSinkFields())) {
+            List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(kafkaSink.getSinkFields());
             kafkaSinkRequest.setFieldList(fieldRequests);
         }
         return kafkaSinkRequest;
@@ -189,19 +190,9 @@ public class InlongStreamSinkTransfer {
             kafkaSink.setTopicName(sinkResponse.getTopicName());
             kafkaSink.setDataFormat(DataFormat.forName(sinkResponse.getSerializationType()));
         }
-
         kafkaSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
-        if (sinkResponse.getFieldList() != null && !sinkResponse.getFieldList().isEmpty()) {
-            List<StreamField> fieldList = sinkResponse.getFieldList()
-                    .stream()
-                    .map(storageFieldRequest -> {
-                        return new StreamField(storageFieldRequest.getId(),
-                                FieldType.forName(storageFieldRequest.getFieldType()),
-                                storageFieldRequest.getFieldName(),
-                                storageFieldRequest.getFieldComment(),
-                                null);
-                    }).collect(Collectors.toList());
-            kafkaSink.setStreamFields(fieldList);
+        if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+            kafkaSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
         }
         return kafkaSink;
     }
@@ -229,28 +220,22 @@ public class InlongStreamSinkTransfer {
         hiveSinkRequest.setPassword(defaultAuthentication.getPassword());
         hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
         hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
-        if (CollectionUtils.isNotEmpty(hiveSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
-                    streamInfo.getFieldList());
+        if (CollectionUtils.isNotEmpty(hiveSink.getSinkFields())) {
+            List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
             hiveSinkRequest.setFieldList(fieldRequests);
         }
         return hiveSinkRequest;
     }
 
-    private static List<SinkFieldRequest> createSinkFields(List<StreamField> sinkFields,
-            List<InlongStreamFieldInfo> sourceFields) {
+    private static List<SinkFieldRequest> createSinkFieldRequests(List<SinkField> sinkFields) {
         List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
-        for (int i = 0; i < sinkFields.size(); i++) {
+        for (SinkField sinkField : sinkFields) {
             SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
-            StreamField sinkField = sinkFields.get(i);
             sinkFieldRequest.setFieldName(sinkField.getFieldName());
             sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
             sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
-            if (sourceFields.size() > i && sourceFields.get(i) != null) {
-                InlongStreamFieldInfo sourceField = sourceFields.get(i);
-                sinkFieldRequest.setSourceFieldName(sourceField.getFieldName());
-                sinkFieldRequest.setSourceFieldType(sourceField.getFieldType());
-            }
+            sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
+            sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
             sinkFieldRequests.add(sinkFieldRequest);
         }
         return sinkFieldRequests;
@@ -293,16 +278,9 @@ public class InlongStreamSinkTransfer {
 
         hiveSink.setSinkType(SinkType.HIVE);
         hiveSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
-        List<StreamField> fieldList = sinkResponse.getFieldList()
-                .stream()
-                .map(storageFieldRequest -> {
-                    return new StreamField(storageFieldRequest.getId(),
-                            FieldType.forName(storageFieldRequest.getFieldType()),
-                            storageFieldRequest.getFieldName(),
-                            storageFieldRequest.getFieldComment(),
-                            null);
-                }).collect(Collectors.toList());
-        hiveSink.setStreamFields(fieldList);
+        if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+            hiveSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+        }
         return hiveSink;
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
index 0d66958..0e8ea89 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
@@ -34,44 +34,33 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 @ApiModel(value = "Response of the ClickHouse sink")
 public class ClickHouseSinkResponse extends SinkResponse {
 
-    public ClickHouseSinkResponse() {
-        this.sinkType = Constant.SINK_CLICKHOUSE;
-    }
-
     @ApiModelProperty("ClickHouse JDBC URL")
     private String jdbcUrl;
-
     @ApiModelProperty("Target database name")
     private String databaseName;
-
     @ApiModelProperty("Target table name")
     private String tableName;
-
     @ApiModelProperty("Username for JDBC URL")
     private String username;
-
     @ApiModelProperty("User password")
     private String password;
-
     @ApiModelProperty("Whether distributed table")
     private Boolean distributedTable;
-
     @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
     private String partitionStrategy;
-
     @ApiModelProperty("Partition key")
     private String partitionKey;
-
     @ApiModelProperty("Key field names")
     private String[] keyFieldNames;
-
     @ApiModelProperty("Flush interval")
     private Integer flushInterval;
-
     @ApiModelProperty("Flush record number")
     private Integer flushRecordNumber;
-
     @ApiModelProperty("Write max retry times")
     private Integer writeMaxRetryTimes;
 
+    public ClickHouseSinkResponse() {
+        this.sinkType = Constant.SINK_CLICKHOUSE;
+    }
+
 }