You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 05:03:25 UTC

[inlong] 04/08: [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit edc0ae2190dddea89301a5001051d70bb1ac5014
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Thu Aug 25 17:39:38 2022 +0800

    [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)
---
 .../org/apache/inlong/manager/common/consts/InlongConstants.java     | 5 +++++
 .../org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java   | 5 ++++-
 .../inlong/manager/service/source/pulsar/PulsarSourceOperator.java   | 4 ++++
 .../java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java  | 5 +++++
 4 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5dc922009 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,4 +110,9 @@ public class InlongConstants {
 
     public static final String SORT_PROPERTIES = "sort.properties";
 
+    /**
+     * common config
+     */
+    public static final String FIELD_DELIMITER = "fieldDelimiter";
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 63ee044b6..626db78c6 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
@@ -231,7 +232,9 @@ public class ExtractNodeUtils {
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
-                format = new CsvFormat();
+                String fieldDelimiter = (String) pulsarSource.getProperties()
+                        .get(InlongConstants.FIELD_DELIMITER);
+                format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
                 break;
             case AVRO:
                 format = new AvroFormat();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 03fb7f96b..1bc490fda 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -151,6 +151,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
                 pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
             }
+            if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
+                Map<String, Object> properties = pulsarSource.getProperties();
+                properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+            }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
index 60cfba67b..ae8aa763c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
@@ -79,6 +79,11 @@ public class CsvFormat implements Format {
         this(",", true, null, false, true, ";", null, null);
     }
 
+    @JsonCreator
+    public CsvFormat(String fieldDelimiter) {
+        this(fieldDelimiter, true, null, false, true, ";", null, null);
+    }
+
     @JsonIgnore
     @Override
     public String getFormat() {