You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/10 01:50:44 UTC

[incubator-inlong] branch master updated: [INLONG-702] sort config field spillter (#538)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b9f329a  [INLONG-702] sort config field spillter (#538)
b9f329a is described below

commit b9f329a8121828ec85c35d68b10da1f734c99ffc
Author: zhaolei-best <zh...@163.com>
AuthorDate: Sat Jul 10 09:50:34 2021 +0800

    [INLONG-702] sort config field spillter (#538)
    
    Co-authored-by: thirtyzhao <th...@tencent.com>
---
 .../sort/PushHiveConfigToSortEventListener.java    | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index 4cac764..bd50ac5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -147,11 +147,18 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
                 .append(hiveStorage.getTableName())
                 .toString();
 
-        char splitter = ',';
-        if (hiveStorage.getFieldSplitter() != null) {
-            splitter = hiveStorage.getFieldSplitter().charAt(0);
+
+        // Encapsulate the deserialization information in the source
+        DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
+                hiveStorage.getDataStreamIdentifier());
+
+        HiveSinkInfo.HiveFileFormat fileFormat = new HiveSinkInfo.TextFileFormat(',');
+        if (dataStream.getFileDelimiter() != null) {
+            char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
+            fileFormat = new HiveSinkInfo.TextFileFormat(c);
         }
 
+
         // encapsulate hive sink
         HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
                 sinkFields.toArray(new FieldInfo[0]),
@@ -163,14 +170,9 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
                 dataPath,
                 Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
                         "yyyMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
-
-                new HiveSinkInfo.TextFileFormat(splitter)
+                fileFormat
         );
 
-        // Encapsulate the deserialization information in the source
-        DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
-                hiveStorage.getDataStreamIdentifier());
-
         // data stream fields
         Stream<FieldInfo> streamFields = dataStream.getFieldList().stream().map(field -> {
             FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
@@ -178,13 +180,14 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
         });
 
         String topic = businessInfo.getMqResourceObj();
-        String consumerGroup = "cg";
+        String consumeGroupName = "sort_" + businessInfo.getMqResourceObj() + "_consumer_group";
         TDMsgCsvDeserializationInfo deserializationInfo = null;
         if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
+            char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
             deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
-                    dataStream.getFileDelimiter().charAt(0));
+                    c);
         }
-        SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumerGroup,
+        SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
                 deserializationInfo, streamFields.toArray(FieldInfo[]::new));
 
         // push information