You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/10 14:53:24 UTC
[incubator-inlong] branch master updated: [INLONG-712] adjust
partition info for hive sink (#542)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a8372fd [INLONG-712] adjust partition info for hive sink (#542)
a8372fd is described below
commit a8372fd014268ed9ce68d2512efaa8f77d25d1f2
Author: healchow <he...@gmail.com>
AuthorDate: Sat Jul 10 22:53:15 2021 +0800
[INLONG-712] adjust partition info for hive sink (#542)
Co-authored-by: healzhou <he...@tencent.com>
---
.../sort/PushHiveConfigToSortEventListener.java | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index bd50ac5..4b35ea7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -17,6 +17,10 @@
package org.apache.inlong.manager.service.thirdpart.sort;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
@@ -45,12 +49,6 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
@Component
public class PushHiveConfigToSortEventListener implements TaskEventListener {
@@ -147,7 +145,6 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
.append(hiveStorage.getTableName())
.toString();
-
// Encapsulate the deserialization information in the source
DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
hiveStorage.getDataStreamIdentifier());
@@ -158,7 +155,6 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
fileFormat = new HiveSinkInfo.TextFileFormat(c);
}
-
// encapsulate hive sink
HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
sinkFields.toArray(new FieldInfo[0]),
@@ -169,7 +165,7 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
hiveStorage.getPassword(),
dataPath,
Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
- "yyyMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
+ "yyyyMMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
fileFormat
);
@@ -184,8 +180,7 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
TDMsgCsvDeserializationInfo deserializationInfo = null;
if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
- deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
- c);
+ deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(), c);
}
SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
deserializationInfo, streamFields.toArray(FieldInfo[]::new));