You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/31 06:11:58 UTC
[inlong] branch master updated: [INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 39ad27232 [INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)
39ad27232 is described below
commit 39ad272325bb9d6711ca42828c158292ba2641b3
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Mon Oct 31 14:11:51 2022 +0800
[INLONG-6276][Manager] Fix the problem of parsing the ASCII code when submitting the Sort task (#6277)
---
.../inlong/manager/common/enums/DataSeparator.java | 60 ----------------------
.../manager/pojo/sink/hive/HiveSinkRequest.java | 7 ++-
.../manager/pojo/sort/util/ExtractNodeUtils.java | 12 +++--
.../source/autopush/AutoPushSourceRequest.java | 6 +--
.../pojo/source/kafka/KafkaSourceRequest.java | 3 +-
.../pojo/source/pulsar/PulsarSourceRequest.java | 6 +--
.../manager/pojo/stream/InlongStreamRequest.java | 3 +-
.../source/pulsar/PulsarSourceOperator.java | 2 +-
.../inlong/manager/service/ServiceBaseTest.java | 4 +-
9 files changed, 23 insertions(+), 80 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
deleted file mode 100644
index 1b549401b..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-/**
- * Enum of data separator and related ASCII code.
- */
-public enum DataSeparator {
-
- VERTICAL_BAR("|", 124),
- COMMA(",", 44),
- COLON(":", 58),
- SEMICOLON(";", 59),
- DASH("-", 45),
- SOH("\001", 1),
- STX("\002", 2),
- ETX("\003", 3),
- TAB("\t", 9);
-
- private final String separator;
-
- private final Integer asciiCode;
-
- DataSeparator(String separator, int asciiCode) {
- this.asciiCode = asciiCode;
- this.separator = separator;
- }
-
- public static DataSeparator forAscii(int asciiCode) {
- for (DataSeparator value : values()) {
- if (value.getAsciiCode() == asciiCode) {
- return value;
- }
- }
- throw new IllegalArgumentException(String.format("Unsupported ascii for %s", asciiCode));
- }
-
- public String getSeparator() {
- return this.separator;
- }
-
- public Integer getAsciiCode() {
- return this.asciiCode;
- }
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
index cfe4f9d31..bbba12f2d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkRequest.java
@@ -22,11 +22,10 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.DataSeparator;
-import org.apache.inlong.manager.common.enums.FileFormat;
import org.apache.inlong.manager.common.consts.SinkType;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.enums.FileFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
import javax.validation.constraints.NotBlank;
import java.nio.charset.StandardCharsets;
@@ -80,7 +79,7 @@ public class HiveSinkRequest extends SinkRequest {
private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty("Data separator")
- private String dataSeparator = DataSeparator.SOH.getSeparator();
+ private String dataSeparator = String.valueOf((int) '\001');
@ApiModelProperty("Version for Hive, such as: 3.2.1")
private String hiveVersion;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index a59f80062..808eea29c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -247,7 +247,12 @@ public class ExtractNodeUtils {
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
switch (dataType) {
case CSV:
- format = new CsvFormat(pulsarSource.getDataSeparator());
+ String separatorStr = pulsarSource.getDataSeparator();
+ if (StringUtils.isNumeric(separatorStr)) {
+ char dataSeparator = (char) Integer.parseInt(pulsarSource.getDataSeparator());
+ separatorStr = Character.toString(dataSeparator);
+ }
+ format = new CsvFormat(separatorStr);
break;
case AVRO:
format = new AvroFormat();
@@ -412,6 +417,7 @@ public class ExtractNodeUtils {
/**
* Create Redis extract node
+ *
* @param source redis source info
* @return redis extract source info
*/
@@ -420,8 +426,8 @@ public class ExtractNodeUtils {
Map<String, String> properties = parseProperties(source.getProperties());
RedisCommand command = RedisCommand.forName(source.getRedisCommand());
RedisMode mode = RedisMode.forName(source.getRedisMode());
- LookupOptions lookupOptions = new LookupOptions(source.getLookupCacheMaxRows(),source.getLookupCacheTtl(),
- source.getLookupMaxRetries(),source.getLookupAsync());
+ LookupOptions lookupOptions = new LookupOptions(source.getLookupCacheMaxRows(), source.getLookupCacheTtl(),
+ source.getLookupMaxRetries(), source.getLookupAsync());
return new RedisExtractNode(
source.getSourceName(),
source.getSourceName(),
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
index b9e08adc9..cbe774318 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.pojo.source.autopush;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
+import java.nio.charset.StandardCharsets;
+
/**
* AutoPush(DataProxy SDK) source request
*/
@@ -45,7 +45,7 @@ public class AutoPushSourceRequest extends SourceRequest {
private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+ private String dataSeparator = String.valueOf((int) '|');
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index 61beeaac5..4ecdc3f1b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -23,7 +23,6 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -87,7 +86,7 @@ public class KafkaSourceRequest extends SourceRequest {
private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+ private String dataSeparator = String.valueOf((int) '|');
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 4d717dca2..d663643dd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.pojo.source.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
+import java.nio.charset.StandardCharsets;
+
/**
* Pulsar source request
*/
@@ -60,7 +60,7 @@ public class PulsarSourceRequest extends SourceRequest {
private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+ private String dataSeparator = String.valueOf((int) '|');
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index cd149478d..855e7593a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.DataSeparator;
import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotBlank;
@@ -65,7 +64,7 @@ public class InlongStreamRequest {
private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = DataSeparator.VERTICAL_BAR.getSeparator();
+ private String dataSeparator = String.valueOf((int) '|');
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 4096b22d8..1b89c4b87 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -154,7 +154,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
- pulsarSource.setDataSeparator(",");
+ pulsarSource.setDataSeparator(String.valueOf((int) ','));
}
}
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 6b7fdf133..da8665fb1 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -18,9 +18,9 @@
package org.apache.inlong.manager.service;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.none.InlongNoneMqInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -117,7 +117,7 @@ public class ServiceBaseTest extends BaseTest {
request.setInlongGroupId(inlongGroupId);
request.setInlongStreamId(inlongStreamId);
request.setMqResource(inlongStreamId);
- request.setDataSeparator("|");
+ request.setDataSeparator(String.valueOf((int) '|'));
request.setDataEncoding("UTF-8");
request.setFieldList(createStreamFields(inlongGroupId, inlongStreamId));
streamService.save(request, GLOBAL_OPERATOR);