You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:52:16 UTC
[inlong] branch master updated: [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 09d0f30cb [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
09d0f30cb is described below
commit 09d0f30cb21807f8b926c185afb2ce5ffcf92ea0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:52:12 2022 +0800
[INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
---
.../apache/inlong/manager/common/consts/InlongConstants.java | 5 -----
.../inlong/manager/pojo/sort/util/ExtractNodeUtils.java | 8 ++++----
.../inlong/manager/pojo/source/autopush/AutoPushSource.java | 9 +++++++++
.../manager/pojo/source/autopush/AutoPushSourceDTO.java | 9 +++++++++
.../manager/pojo/source/autopush/AutoPushSourceRequest.java | 11 +++++++++++
.../apache/inlong/manager/pojo/source/kafka/KafkaSource.java | 9 +++++++++
.../inlong/manager/pojo/source/kafka/KafkaSourceDTO.java | 9 +++++++++
.../inlong/manager/pojo/source/kafka/KafkaSourceRequest.java | 11 +++++++++++
.../inlong/manager/pojo/source/pulsar/PulsarSource.java | 9 +++++++++
.../inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java | 9 +++++++++
.../manager/pojo/source/pulsar/PulsarSourceRequest.java | 11 +++++++++++
.../manager/service/source/pulsar/PulsarSourceOperator.java | 7 +++++--
12 files changed, 96 insertions(+), 11 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 5dc922009..28c2435d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,9 +110,4 @@ public class InlongConstants {
public static final String SORT_PROPERTIES = "sort.properties";
- /**
- * common config
- */
- public static final String FIELD_DELIMITER = "fieldDelimiter";
-
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 626db78c6..c94f84af4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
@@ -232,9 +232,9 @@ public class ExtractNodeUtils {
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
switch (dataType) {
case CSV:
- String fieldDelimiter = (String) pulsarSource.getProperties()
- .get(InlongConstants.FIELD_DELIMITER);
- format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
+ String separator = DataSeparator
+ .forAscii(Integer.parseInt(pulsarSource.getDataSeparator())).getSeparator();
+ format = new CsvFormat(separator);
break;
case AVRO:
format = new AvroFormat();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
index d67a92ee1..ebd23d5fc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
@@ -45,6 +45,15 @@ public class AutoPushSource extends StreamSource {
@ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
private String dataProxyGroup;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
public AutoPushSource() {
this.setSourceType(SourceType.AUTO_PUSH);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
index 1077230f8..baa0c872c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
@@ -43,6 +43,15 @@ public class AutoPushSourceDTO {
@ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
private String dataProxyGroup;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
public static AutoPushSourceDTO getFromRequest(AutoPushSourceRequest request) {
return AutoPushSourceDTO.builder()
.dataProxyGroup(request.getDataProxyGroup())
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
index c8b8010d2..6ee58efd0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.autopush;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -39,6 +41,15 @@ public class AutoPushSourceRequest extends SourceRequest {
@ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
private String dataProxyGroup;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
public AutoPushSourceRequest() {
this.setSourceType(SourceType.AUTO_PUSH);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index 2c9ca8004..74d480b35 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -81,6 +81,15 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
public KafkaSource() {
this.setSourceType(SourceType.KAFKA);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index 70d72cccc..e81c373a7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -90,6 +90,15 @@ public class KafkaSourceDTO {
@ApiModelProperty("Field needed when serializationType is csv,json,avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
@ApiModelProperty("Properties for Kafka")
private Map<String, Object> properties;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index 2934fd83a..120567218 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.kafka;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -77,6 +79,15 @@ public class KafkaSourceRequest extends SourceRequest {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
public KafkaSourceRequest() {
this.setSourceType(SourceType.KAFKA);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 763aefb24..e1be7d1cb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -62,6 +62,15 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
@ApiModelProperty("Configure the Source's startup mode. "
+ "Available options are earliest, latest, external-subscription, and specific-offsets.")
@Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 602b6f74b..17c540a88 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -59,6 +59,15 @@ public class PulsarSourceDTO {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
@ApiModelProperty("Configure the Source's startup mode. "
+ "Available options are earliest, latest, external-subscription, and specific-offsets.")
@Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 497ea2240..e883e1caa 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -54,6 +56,15 @@ public class PulsarSourceRequest extends SourceRequest {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
+ private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
+
@ApiModelProperty("Configure the Source's startup mode."
+ " Available options are earliest, latest, external-subscription, and specific-offsets.")
private String scanStartupMode = "earliest";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 1bc490fda..0b079c269 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -152,8 +153,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
}
if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
- Map<String, Object> properties = pulsarSource.getProperties();
- properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+ pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
+ if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
+ pulsarSource.setDataSeparator(DataSeparator.COMMA.getAsciiCode().toString());
+ }
}
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());