You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/10 01:50:44 UTC
[incubator-inlong] branch master updated: [INLONG-702] sort config
field spillter (#538)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b9f329a [INLONG-702] sort config field spillter (#538)
b9f329a is described below
commit b9f329a8121828ec85c35d68b10da1f734c99ffc
Author: zhaolei-best <zh...@163.com>
AuthorDate: Sat Jul 10 09:50:34 2021 +0800
[INLONG-702] sort config field spillter (#538)
Co-authored-by: thirtyzhao <th...@tencent.com>
---
.../sort/PushHiveConfigToSortEventListener.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index 4cac764..bd50ac5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -147,11 +147,18 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
.append(hiveStorage.getTableName())
.toString();
- char splitter = ',';
- if (hiveStorage.getFieldSplitter() != null) {
- splitter = hiveStorage.getFieldSplitter().charAt(0);
+
+ // Encapsulate the deserialization information in the source
+ DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
+ hiveStorage.getDataStreamIdentifier());
+
+ HiveSinkInfo.HiveFileFormat fileFormat = new HiveSinkInfo.TextFileFormat(',');
+ if (dataStream.getFileDelimiter() != null) {
+ char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
+ fileFormat = new HiveSinkInfo.TextFileFormat(c);
}
+
// encapsulate hive sink
HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
sinkFields.toArray(new FieldInfo[0]),
@@ -163,14 +170,9 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
dataPath,
Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
"yyyMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
-
- new HiveSinkInfo.TextFileFormat(splitter)
+ fileFormat
);
- // Encapsulate the deserialization information in the source
- DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
- hiveStorage.getDataStreamIdentifier());
-
// data stream fields
Stream<FieldInfo> streamFields = dataStream.getFieldList().stream().map(field -> {
FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
@@ -178,13 +180,14 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
});
String topic = businessInfo.getMqResourceObj();
- String consumerGroup = "cg";
+ String consumeGroupName = "sort_" + businessInfo.getMqResourceObj() + "_consumer_group";
TDMsgCsvDeserializationInfo deserializationInfo = null;
if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
+ char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
- dataStream.getFileDelimiter().charAt(0));
+ c);
}
- SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumerGroup,
+ SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
deserializationInfo, streamFields.toArray(FieldInfo[]::new));
// push information