You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/31 06:11:58 UTC

[inlong] branch master updated: [INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 39ad27232 [INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)
39ad27232 is described below

commit 39ad272325bb9d6711ca42828c158292ba2641b3
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Mon Oct 31 14:11:51 2022 +0800

    [INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)
---
 .../inlong/manager/common/enums/DataSeparator.java | 60 ----------------------
 .../manager/pojo/sink/hive/HiveSinkRequest.java    |  7 ++-
 .../manager/pojo/sort/util/ExtractNodeUtils.java   | 12 +++--
 .../source/autopush/AutoPushSourceRequest.java     |  6 +--
 .../pojo/source/kafka/KafkaSourceRequest.java      |  3 +-
 .../pojo/source/pulsar/PulsarSourceRequest.java    |  6 +--
 .../manager/pojo/stream/InlongStreamRequest.java   |  3 +-
 .../source/pulsar/PulsarSourceOperator.java        |  2 +-
 .../inlong/manager/service/ServiceBaseTest.java    |  4 +-
 9 files changed, 23 insertions(+), 80 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
deleted file mode 100644
index 1b549401b..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-/**
- * Enum of data separator and related ASCII code.
- */
-public enum DataSeparator {
-
-    VERTICAL_BAR("|", 124),
-    COMMA(",", 44),
-    COLON(":", 58),
-    SEMICOLON(";", 59),
-    DASH("-", 45),
-    SOH("\001", 1),
-    STX("\002", 2),
-    ETX("\003", 3),
-    TAB("\t", 9);
-
-    private final String separator;
-
-    private final Integer asciiCode;
-
-    DataSeparator(String separator, int asciiCode) {
-        this.asciiCode = asciiCode;
-        this.separator = separator;
-    }
-
-    public static DataSeparator forAscii(int asciiCode) {
-        for (DataSeparator value : values()) {
-            if (value.getAsciiCode() == asciiCode) {
-                return value;
-            }
-        }
-        throw new IllegalArgumentException(String.format("Unsupported ascii for %s", asciiCode));
-    }
-
-    public String getSeparator() {
-        return this.separator;
-    }
-
-    public Integer getAsciiCode() {
-        return this.asciiCode;
-    }
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
index cfe4f9d31..bbba12f2d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
@@ -22,11 +22,10 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.DataSeparator;
-import org.apache.inlong.manager.common.enums.FileFormat;
 import org.apache.inlong.manager.common.consts.SinkType;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.enums.FileFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
 
 import javax.validation.constraints.NotBlank;
 import java.nio.charset.StandardCharsets;
@@ -80,7 +79,7 @@ public class HiveSinkRequest extends SinkRequest {
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty("Data separator")
-    private String dataSeparator = DataSeparator.SOH.getSeparator();
+    private String dataSeparator = String.valueOf((int) '\001');
 
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index a59f80062..808eea29c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -247,7 +247,12 @@ public class ExtractNodeUtils {
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
-                format = new CsvFormat(pulsarSource.getDataSeparator());
+                String separatorStr = pulsarSource.getDataSeparator();
+                if (StringUtils.isNumeric(separatorStr)) {
+                    char dataSeparator = (char) Integer.parseInt(pulsarSource.getDataSeparator());
+                    separatorStr = Character.toString(dataSeparator);
+                }
+                format = new CsvFormat(separatorStr);
                 break;
             case AVRO:
                 format = new AvroFormat();
@@ -412,6 +417,7 @@ public class ExtractNodeUtils {
 
     /**
      * Create Redis extract node
+     *
      * @param source redis source info
      * @return redis extract source info
      */
@@ -420,8 +426,8 @@ public class ExtractNodeUtils {
         Map<String, String> properties = parseProperties(source.getProperties());
         RedisCommand command = RedisCommand.forName(source.getRedisCommand());
         RedisMode mode = RedisMode.forName(source.getRedisMode());
-        LookupOptions lookupOptions = new LookupOptions(source.getLookupCacheMaxRows(),source.getLookupCacheTtl(),
-                source.getLookupMaxRetries(),source.getLookupAsync());
+        LookupOptions lookupOptions = new LookupOptions(source.getLookupCacheMaxRows(), source.getLookupCacheTtl(),
+                source.getLookupMaxRetries(), source.getLookupAsync());
         return new RedisExtractNode(
                 source.getSourceName(),
                 source.getSourceName(),
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
index b9e08adc9..cbe774318 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.pojo.source.autopush;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * AutoPush(DataProxy SDK) source request
  */
@@ -45,7 +45,7 @@ public class AutoPushSourceRequest extends SourceRequest {
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+    private String dataSeparator = String.valueOf((int) '|');
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index 61beeaac5..4ecdc3f1b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -23,7 +23,6 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -87,7 +86,7 @@ public class KafkaSourceRequest extends SourceRequest {
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+    private String dataSeparator = String.valueOf((int) '|');
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 4d717dca2..d663643dd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * Pulsar source request
  */
@@ -60,7 +60,7 @@ public class PulsarSourceRequest extends SourceRequest {
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+    private String dataSeparator = String.valueOf((int) '|');
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index cd149478d..855e7593a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.pojo.stream;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
@@ -65,7 +64,7 @@ public class InlongStreamRequest {
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+    private String dataSeparator = String.valueOf((int) '|');
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 4096b22d8..1b89c4b87 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -154,7 +154,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
                 pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
                 if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
-                    pulsarSource.setDataSeparator(",");
+                    pulsarSource.setDataSeparator(String.valueOf((int) ','));
                 }
             }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 6b7fdf133..da8665fb1 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.manager.service;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.none.InlongNoneMqInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -117,7 +117,7 @@ public class ServiceBaseTest extends BaseTest {
         request.setInlongGroupId(inlongGroupId);
         request.setInlongStreamId(inlongStreamId);
         request.setMqResource(inlongStreamId);
-        request.setDataSeparator("|");
+        request.setDataSeparator(String.valueOf((int) '|'));
         request.setDataEncoding("UTF-8");
         request.setFieldList(createStreamFields(inlongGroupId, inlongStreamId));
         streamService.save(request, GLOBAL_OPERATOR);