You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:52:16 UTC

[inlong] branch master updated: [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09d0f30cb [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
09d0f30cb is described below

commit 09d0f30cb21807f8b926c185afb2ce5ffcf92ea0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:52:12 2022 +0800

    [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
---
 .../apache/inlong/manager/common/consts/InlongConstants.java  |  5 -----
 .../inlong/manager/pojo/sort/util/ExtractNodeUtils.java       |  8 ++++----
 .../inlong/manager/pojo/source/autopush/AutoPushSource.java   |  9 +++++++++
 .../manager/pojo/source/autopush/AutoPushSourceDTO.java       |  9 +++++++++
 .../manager/pojo/source/autopush/AutoPushSourceRequest.java   | 11 +++++++++++
 .../apache/inlong/manager/pojo/source/kafka/KafkaSource.java  |  9 +++++++++
 .../inlong/manager/pojo/source/kafka/KafkaSourceDTO.java      |  9 +++++++++
 .../inlong/manager/pojo/source/kafka/KafkaSourceRequest.java  | 11 +++++++++++
 .../inlong/manager/pojo/source/pulsar/PulsarSource.java       |  9 +++++++++
 .../inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java    |  9 +++++++++
 .../manager/pojo/source/pulsar/PulsarSourceRequest.java       | 11 +++++++++++
 .../manager/service/source/pulsar/PulsarSourceOperator.java   |  7 +++++--
 12 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 5dc922009..28c2435d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,9 +110,4 @@ public class InlongConstants {
 
     public static final String SORT_PROPERTIES = "sort.properties";
 
-    /**
-     * common config
-     */
-    public static final String FIELD_DELIMITER = "fieldDelimiter";
-
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 626db78c6..c94f84af4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
@@ -232,9 +232,9 @@ public class ExtractNodeUtils {
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
-                String fieldDelimiter = (String) pulsarSource.getProperties()
-                        .get(InlongConstants.FIELD_DELIMITER);
-                format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
+                String separator = DataSeparator
+                        .forAscii(Integer.parseInt(pulsarSource.getDataSeparator())).getSeparator();
+                format = new CsvFormat(separator);
                 break;
             case AVRO:
                 format = new AvroFormat();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
index d67a92ee1..ebd23d5fc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
@@ -45,6 +45,15 @@ public class AutoPushSource extends StreamSource {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public AutoPushSource() {
         this.setSourceType(SourceType.AUTO_PUSH);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
index 1077230f8..baa0c872c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
@@ -43,6 +43,15 @@ public class AutoPushSourceDTO {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public static AutoPushSourceDTO getFromRequest(AutoPushSourceRequest request) {
         return AutoPushSourceDTO.builder()
                 .dataProxyGroup(request.getDataProxyGroup())
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
index c8b8010d2..6ee58efd0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.autopush;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -39,6 +41,15 @@ public class AutoPushSourceRequest extends SourceRequest {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public AutoPushSourceRequest() {
         this.setSourceType(SourceType.AUTO_PUSH);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index 2c9ca8004..74d480b35 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -81,6 +81,15 @@ public class KafkaSource extends StreamSource {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public KafkaSource() {
         this.setSourceType(SourceType.KAFKA);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index 70d72cccc..e81c373a7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -90,6 +90,15 @@ public class KafkaSourceDTO {
     @ApiModelProperty("Field needed when serializationType is csv,json,avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Properties for Kafka")
     private Map<String, Object> properties;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index 2934fd83a..120567218 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.kafka;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -77,6 +79,15 @@ public class KafkaSourceRequest extends SourceRequest {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public KafkaSourceRequest() {
         this.setSourceType(SourceType.KAFKA);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 763aefb24..e1be7d1cb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -62,6 +62,15 @@ public class PulsarSource extends StreamSource {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode. "
             + "Available options are earliest, latest, external-subscription, and specific-offsets.")
     @Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 602b6f74b..17c540a88 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -59,6 +59,15 @@ public class PulsarSourceDTO {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode. "
             + "Available options are earliest, latest, external-subscription, and specific-offsets.")
     @Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 497ea2240..e883e1caa 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -54,6 +56,15 @@ public class PulsarSourceRequest extends SourceRequest {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode."
             + " Available options are earliest, latest, external-subscription, and specific-offsets.")
     private String scanStartupMode = "earliest";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 1bc490fda..0b079c269 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -152,8 +153,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
                 pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
             }
             if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
-                Map<String, Object> properties = pulsarSource.getProperties();
-                properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+                pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
+                if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
+                    pulsarSource.setDataSeparator(DataSeparator.COMMA.getAsciiCode().toString());
+                }
             }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());