You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 05:03:25 UTC
[inlong] 04/08: [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit edc0ae2190dddea89301a5001051d70bb1ac5014
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Thu Aug 25 17:39:38 2022 +0800
[INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)
---
.../org/apache/inlong/manager/common/consts/InlongConstants.java | 5 +++++
.../org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java | 5 ++++-
.../inlong/manager/service/source/pulsar/PulsarSourceOperator.java | 4 ++++
.../java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java | 5 +++++
4 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5dc922009 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,4 +110,9 @@ public class InlongConstants {
public static final String SORT_PROPERTIES = "sort.properties";
+ /**
+ * common config
+ */
+ public static final String FIELD_DELIMITER = "fieldDelimiter";
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 63ee044b6..626db78c6 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
@@ -231,7 +232,9 @@ public class ExtractNodeUtils {
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
switch (dataType) {
case CSV:
- format = new CsvFormat();
+ String fieldDelimiter = (String) pulsarSource.getProperties()
+ .get(InlongConstants.FIELD_DELIMITER);
+ format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
break;
case AVRO:
format = new AvroFormat();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 03fb7f96b..1bc490fda 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -151,6 +151,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
}
+ if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
+ Map<String, Object> properties = pulsarSource.getProperties();
+ properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+ }
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
index 60cfba67b..ae8aa763c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
@@ -79,6 +79,11 @@ public class CsvFormat implements Format {
this(",", true, null, false, true, ";", null, null);
}
+ @JsonCreator
+ public CsvFormat(String fieldDelimiter) {
+ this(fieldDelimiter, true, null, false, true, ";", null, null);
+ }
+
@JsonIgnore
@Override
public String getFormat() {