You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/10 14:53:24 UTC

[incubator-inlong] branch master updated: [INLONG-712] adjust partition info for hive sink (#542)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a8372fd  [INLONG-712] adjust partition info for hive sink (#542)
a8372fd is described below

commit a8372fd014268ed9ce68d2512efaa8f77d25d1f2
Author: healchow <he...@gmail.com>
AuthorDate: Sat Jul 10 22:53:15 2021 +0800

    [INLONG-712] adjust partition info for hive sink (#542)
    
    Co-authored-by: healzhou <he...@tencent.com>
---
 .../sort/PushHiveConfigToSortEventListener.java         | 17 ++++++-----------
 1 file changed, 6 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index bd50ac5..4b35ea7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.manager.service.thirdpart.sort;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
@@ -45,12 +49,6 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 @Component
 public class PushHiveConfigToSortEventListener implements TaskEventListener {
@@ -147,7 +145,6 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
                 .append(hiveStorage.getTableName())
                 .toString();
 
-
         // Encapsulate the deserialization information in the source
         DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getBusinessIdentifier(),
                 hiveStorage.getDataStreamIdentifier());
@@ -158,7 +155,6 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
             fileFormat = new HiveSinkInfo.TextFileFormat(c);
         }
 
-
         // encapsulate hive sink
         HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
                 sinkFields.toArray(new FieldInfo[0]),
@@ -169,7 +165,7 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
                 hiveStorage.getPassword(),
                 dataPath,
                 Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
-                        "yyyMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
+                        "yyyyMMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
                 fileFormat
         );
 
@@ -184,8 +180,7 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
         TDMsgCsvDeserializationInfo deserializationInfo = null;
         if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
             char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
-            deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(),
-                    c);
+            deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getDataStreamIdentifier(), c);
         }
         SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
                 deserializationInfo, streamFields.toArray(FieldInfo[]::new));