You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 09:42:06 UTC
[incubator-inlong] branch master updated: [INLONG-693] change data
type to tdmsg
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 479027c [INLONG-693] change data type to tdmsg
479027c is described below
commit 479027c8e51e95c70a541200ca63ff0b697b6c99
Author: thirtyzhao <th...@tencent.com>
AuthorDate: Fri Jul 9 16:53:18 2021 +0800
[INLONG-693] change data type to tdmsg
---
.../sort/PushHiveConfigToSortEventListener.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index a16e7ca..99e7024 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -17,10 +17,6 @@
package org.apache.inlong.manager.service.thirdpart.sort;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
@@ -42,13 +38,19 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
@Slf4j
@Component
public class PushHiveConfigToSortEventListener implements TaskEventListener {
@@ -173,9 +175,10 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
String topic = businessInfo.getMqResourceObj();
String consumerGroup = "cg";
- CsvDeserializationInfo deserializationInfo = null;
+ TDMsgCsvDeserializationInfo deserializationInfo = null;
if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
- deserializationInfo = new CsvDeserializationInfo(dataStream.getFileDelimiter().charAt(0));
+ deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
+ dataStream.getFileDelimiter().charAt(0));
}
SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumerGroup,
deserializationInfo, streamFields.toArray(FieldInfo[]::new));