You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 06:38:53 UTC
[incubator-inlong] branch master updated: [INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d4e96a4 [INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)
d4e96a4 is described below
commit d4e96a44623312768dcbfd5513dc0ebacdba1609
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Mar 10 14:38:24 2022 +0800
[INLONG-3037][Manager] Add field mapping support for source and sink in manage client (#3038)
---
.../client/api/{StreamSink.java => SinkField.java} | 31 +++---
.../inlong/manager/client/api/StreamSink.java | 5 +-
.../manager/client/api/sink/ClickHouseSink.java | 6 +-
.../inlong/manager/client/api/sink/HiveSink.java | 46 ++++----
.../inlong/manager/client/api/sink/KafkaSink.java | 4 +-
.../client/api/util/InlongStreamSinkTransfer.java | 116 +++++++++------------
.../pojo/sink/ck/ClickHouseSinkResponse.java | 19 +---
7 files changed, 96 insertions(+), 131 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
similarity index 60%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index 9646de1..7a0b3e3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -19,21 +19,24 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
+import lombok.NoArgsConstructor;
@Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
-
- @ApiModelProperty(value = "DataSink name", required = true)
- private String sinkName;
-
- public abstract SinkType getSinkType();
-
- public abstract List<StreamField> getStreamFields();
-
- public abstract DataFormat getDataFormat();
-
+@NoArgsConstructor
+@ApiModel("Sink field configuration")
+public class SinkField extends StreamField {
+
+ @ApiModelProperty("Source field name")
+ private String sourceFieldName;
+
+ @ApiModelProperty("Source field type")
+ private String sourceFieldType;
+
+ public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
+ String fieldValue, String sourceFieldName, String sourceFieldType) {
+ super(index, fieldType, fieldName, fieldComment, fieldValue);
+ this.sourceFieldName = sourceFieldName;
+ this.sourceFieldType = sourceFieldType;
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 9646de1..df6f610 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -19,10 +19,11 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
import lombok.Data;
import org.apache.inlong.manager.common.enums.SinkType;
+import java.util.List;
+
@Data
@ApiModel("Stream sink configuration")
public abstract class StreamSink {
@@ -32,7 +33,7 @@ public abstract class StreamSink {
public abstract SinkType getSinkType();
- public abstract List<StreamField> getStreamFields();
+ public abstract List<SinkField> getSinkFields();
public abstract DataFormat getDataFormat();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
index dcb3477..1bd24ad 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
@@ -24,7 +24,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -78,8 +78,8 @@ public class ClickHouseSink extends StreamSink {
@ApiModelProperty("Create topic or not")
private boolean needCreated;
- @ApiModelProperty("Field definitions for kafka")
- private List<StreamField> streamFields;
+ @ApiModelProperty("Field definitions for clickhouse")
+ private List<SinkField> sinkFields;
@ApiModelProperty("Other properties if need")
private Map<String, String> properties;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index f93adfb..d9c1dde 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -19,21 +19,22 @@ package org.apache.inlong.manager.client.api.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.SinkType;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@@ -67,39 +68,32 @@ public class HiveSink extends StreamSink {
@ApiModelProperty("Data separator, stored as ASCII code")
private DataSeparator dataSeparator = DataSeparator.SOH;
-
- public enum FileFormat {
- TextFile, RCFile, SequenceFile, Avro;
-
- public static FileFormat forName(String name) {
- for (FileFormat value : values()) {
- if (value.name().equals(name)) {
- return value;
- }
- }
- throw new IllegalArgumentException(String.format("Unsupport FileFormat:%s", name));
- }
- }
-
@ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
private FileFormat fileFormat;
-
@ApiModelProperty("Create table or not")
private boolean needCreated;
-
@ApiModelProperty("Primary partition field, default null")
private String primaryPartition;
-
@ApiModelProperty("Secondary partition field, default null")
private String secondaryPartition;
-
@ApiModelProperty("Field definitions for hive")
- private List<StreamField> streamFields;
-
+ private List<SinkField> sinkFields;
@ApiModelProperty("Other properties if need")
private Map<String, String> properties;
-
@ApiModelProperty("Data format type for stream sink")
private DataFormat dataFormat;
+
+ public enum FileFormat {
+ TextFile, RCFile, SequenceFile, Avro;
+
+ public static FileFormat forName(String name) {
+ for (FileFormat value : values()) {
+ if (value.name().equals(name)) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport FileFormat:%s", name));
+ }
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
index fd83948..935f383 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
@@ -24,7 +24,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -54,7 +54,7 @@ public class KafkaSink extends StreamSink {
private boolean needCreated;
@ApiModelProperty("Field definitions for kafka")
- private List<StreamField> streamFields;
+ private List<SinkField> sinkFields;
@ApiModelProperty("Other properties if need")
private Map<String, String> properties;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 496daab..8755dfe 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField.FieldType;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.client.api.sink.HiveSink.FileFormat;
import org.apache.inlong.manager.client.api.sink.KafkaSink;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkRequest;
@@ -39,7 +40,6 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -108,52 +108,54 @@ public class InlongStreamSinkTransfer {
clickHouseSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
- if (CollectionUtils.isNotEmpty(clickHouseSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
- streamInfo.getFieldList());
+ if (CollectionUtils.isNotEmpty(clickHouseSink.getSinkFields())) {
+ List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
clickHouseSinkRequest.setFieldList(fieldRequests);
}
return clickHouseSinkRequest;
}
- private static StreamSink parseClickHouseSink(ClickHouseSinkResponse clickHouseSinkResponse,
+ private static StreamSink parseClickHouseSink(ClickHouseSinkResponse sinkResponse,
StreamSink streamSink) {
ClickHouseSink clickHouseSink = new ClickHouseSink();
if (streamSink != null) {
- AssertUtil.isTrue(clickHouseSinkResponse.getSinkName().equals(streamSink.getSinkName()),
- String.format("SinkName is not equal: %s != %s", clickHouseSinkResponse, streamSink));
+ AssertUtil.isTrue(sinkResponse.getSinkName().equals(streamSink.getSinkName()),
+ String.format("SinkName is not equal: %s != %s", sinkResponse, streamSink));
ClickHouseSink snapshot = (ClickHouseSink) streamSink;
clickHouseSink = CommonBeanUtils.copyProperties(snapshot, ClickHouseSink::new);
} else {
- clickHouseSink.setDistributedTable(clickHouseSinkResponse.getDistributedTable());
- clickHouseSink.setSinkName(clickHouseSinkResponse.getSinkName());
- clickHouseSink.setFlushInterval(clickHouseSinkResponse.getFlushInterval());
- clickHouseSink.setAuthentication(new DefaultAuthentication(clickHouseSinkResponse.getSinkName(),
- clickHouseSinkResponse.getPassword()));
- clickHouseSink.setDatabaseName(clickHouseSinkResponse.getDatabaseName());
- clickHouseSink.setFlushRecordNumber(clickHouseSinkResponse.getFlushRecordNumber());
- clickHouseSink.setJdbcUrl(clickHouseSinkResponse.getJdbcUrl());
- clickHouseSink.setPartitionKey(clickHouseSinkResponse.getPartitionKey());
- clickHouseSink.setKeyFieldNames(clickHouseSinkResponse.getKeyFieldNames());
- clickHouseSink.setPartitionStrategy(clickHouseSinkResponse.getPartitionStrategy());
- clickHouseSink.setWriteMaxRetryTimes(clickHouseSinkResponse.getWriteMaxRetryTimes());
- clickHouseSink.setDistributedTable(clickHouseSinkResponse.getDistributedTable());
+ clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
+ clickHouseSink.setSinkName(sinkResponse.getSinkName());
+ clickHouseSink.setFlushInterval(sinkResponse.getFlushInterval());
+ clickHouseSink.setAuthentication(new DefaultAuthentication(sinkResponse.getSinkName(),
+ sinkResponse.getPassword()));
+ clickHouseSink.setDatabaseName(sinkResponse.getDatabaseName());
+ clickHouseSink.setFlushRecordNumber(sinkResponse.getFlushRecordNumber());
+ clickHouseSink.setJdbcUrl(sinkResponse.getJdbcUrl());
+ clickHouseSink.setPartitionKey(sinkResponse.getPartitionKey());
+ clickHouseSink.setKeyFieldNames(sinkResponse.getKeyFieldNames());
+ clickHouseSink.setPartitionStrategy(sinkResponse.getPartitionStrategy());
+ clickHouseSink.setWriteMaxRetryTimes(sinkResponse.getWriteMaxRetryTimes());
+ clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
}
- clickHouseSink.setNeedCreated(clickHouseSinkResponse.getEnableCreateResource() == 1);
- List<StreamField> fieldList = clickHouseSinkResponse.getFieldList()
- .stream()
- .map(storageFieldRequest -> {
- return new StreamField(storageFieldRequest.getId(),
- FieldType.forName(storageFieldRequest.getFieldType()),
- storageFieldRequest.getFieldName(),
- storageFieldRequest.getFieldComment(),
- null);
- }).collect(Collectors.toList());
- clickHouseSink.setStreamFields(fieldList);
+ clickHouseSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
+ if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+ clickHouseSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+ }
return clickHouseSink;
}
+ private static List<SinkField> convertToSinkFields(List<SinkFieldResponse> sinkFieldResponses) {
+ return sinkFieldResponses.stream().map(sinkFieldResponse -> new SinkField(sinkFieldResponse.getId(),
+ FieldType.forName(sinkFieldResponse.getFieldType()),
+ sinkFieldResponse.getFieldName(),
+ sinkFieldResponse.getFieldComment(),
+ null, sinkFieldResponse.getSourceFieldName(),
+ sinkFieldResponse.getSourceFieldType())).collect(Collectors.toList());
+
+ }
+
private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
KafkaSinkRequest kafkaSinkRequest = new KafkaSinkRequest();
KafkaSink kafkaSink = (KafkaSink) streamSink;
@@ -165,9 +167,8 @@ public class InlongStreamSinkTransfer {
kafkaSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
- if (CollectionUtils.isNotEmpty(kafkaSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = createSinkFields(kafkaSink.getStreamFields(),
- streamInfo.getFieldList());
+ if (CollectionUtils.isNotEmpty(kafkaSink.getSinkFields())) {
+ List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(kafkaSink.getSinkFields());
kafkaSinkRequest.setFieldList(fieldRequests);
}
return kafkaSinkRequest;
@@ -189,19 +190,9 @@ public class InlongStreamSinkTransfer {
kafkaSink.setTopicName(sinkResponse.getTopicName());
kafkaSink.setDataFormat(DataFormat.forName(sinkResponse.getSerializationType()));
}
-
kafkaSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
- if (sinkResponse.getFieldList() != null && !sinkResponse.getFieldList().isEmpty()) {
- List<StreamField> fieldList = sinkResponse.getFieldList()
- .stream()
- .map(storageFieldRequest -> {
- return new StreamField(storageFieldRequest.getId(),
- FieldType.forName(storageFieldRequest.getFieldType()),
- storageFieldRequest.getFieldName(),
- storageFieldRequest.getFieldComment(),
- null);
- }).collect(Collectors.toList());
- kafkaSink.setStreamFields(fieldList);
+ if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+ kafkaSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
}
return kafkaSink;
}
@@ -229,28 +220,22 @@ public class InlongStreamSinkTransfer {
hiveSinkRequest.setPassword(defaultAuthentication.getPassword());
hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
- if (CollectionUtils.isNotEmpty(hiveSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
- streamInfo.getFieldList());
+ if (CollectionUtils.isNotEmpty(hiveSink.getSinkFields())) {
+ List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
hiveSinkRequest.setFieldList(fieldRequests);
}
return hiveSinkRequest;
}
- private static List<SinkFieldRequest> createSinkFields(List<StreamField> sinkFields,
- List<InlongStreamFieldInfo> sourceFields) {
+ private static List<SinkFieldRequest> createSinkFieldRequests(List<SinkField> sinkFields) {
List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
- for (int i = 0; i < sinkFields.size(); i++) {
+ for (SinkField sinkField : sinkFields) {
SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
- StreamField sinkField = sinkFields.get(i);
sinkFieldRequest.setFieldName(sinkField.getFieldName());
sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
- if (sourceFields.size() > i && sourceFields.get(i) != null) {
- InlongStreamFieldInfo sourceField = sourceFields.get(i);
- sinkFieldRequest.setSourceFieldName(sourceField.getFieldName());
- sinkFieldRequest.setSourceFieldType(sourceField.getFieldType());
- }
+ sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
+ sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
sinkFieldRequests.add(sinkFieldRequest);
}
return sinkFieldRequests;
@@ -293,16 +278,9 @@ public class InlongStreamSinkTransfer {
hiveSink.setSinkType(SinkType.HIVE);
hiveSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
- List<StreamField> fieldList = sinkResponse.getFieldList()
- .stream()
- .map(storageFieldRequest -> {
- return new StreamField(storageFieldRequest.getId(),
- FieldType.forName(storageFieldRequest.getFieldType()),
- storageFieldRequest.getFieldName(),
- storageFieldRequest.getFieldComment(),
- null);
- }).collect(Collectors.toList());
- hiveSink.setStreamFields(fieldList);
+ if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+ hiveSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+ }
return hiveSink;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
index 0d66958..0e8ea89 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
@@ -34,44 +34,33 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
@ApiModel(value = "Response of the ClickHouse sink")
public class ClickHouseSinkResponse extends SinkResponse {
- public ClickHouseSinkResponse() {
- this.sinkType = Constant.SINK_CLICKHOUSE;
- }
-
@ApiModelProperty("ClickHouse JDBC URL")
private String jdbcUrl;
-
@ApiModelProperty("Target database name")
private String databaseName;
-
@ApiModelProperty("Target table name")
private String tableName;
-
@ApiModelProperty("Username for JDBC URL")
private String username;
-
@ApiModelProperty("User password")
private String password;
-
@ApiModelProperty("Whether distributed table")
private Boolean distributedTable;
-
@ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
private String partitionStrategy;
-
@ApiModelProperty("Partition key")
private String partitionKey;
-
@ApiModelProperty("Key field names")
private String[] keyFieldNames;
-
@ApiModelProperty("Flush interval")
private Integer flushInterval;
-
@ApiModelProperty("Flush record number")
private Integer flushRecordNumber;
-
@ApiModelProperty("Write max retry times")
private Integer writeMaxRetryTimes;
+ public ClickHouseSinkResponse() {
+ this.sinkType = Constant.SINK_CLICKHOUSE;
+ }
+
}