You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 09:42:06 UTC

[incubator-inlong] branch master updated: [INLONG-693] change data type to tdmsg

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 479027c  [INLONG-693] change data type to tdmsg
479027c is described below

commit 479027c8e51e95c70a541200ca63ff0b697b6c99
Author: thirtyzhao <th...@tencent.com>
AuthorDate: Fri Jul 9 16:53:18 2021 +0800

    [INLONG-693] change data type to tdmsg
---
 .../sort/PushHiveConfigToSortEventListener.java         | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index a16e7ca..99e7024 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.manager.service.thirdpart.sort;
 
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
@@ -42,13 +38,19 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 @Component
 public class PushHiveConfigToSortEventListener implements TaskEventListener {
@@ -173,9 +175,10 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
 
         String topic = businessInfo.getMqResourceObj();
         String consumerGroup = "cg";
-        CsvDeserializationInfo deserializationInfo = null;
+        TDMsgCsvDeserializationInfo deserializationInfo = null;
         if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
-            deserializationInfo = new CsvDeserializationInfo(dataStream.getFileDelimiter().charAt(0));
+            deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
+                    dataStream.getFileDelimiter().charAt(0));
         }
         SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumerGroup,
                 deserializationInfo, streamFields.toArray(FieldInfo[]::new));