You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/01 04:39:24 UTC

[incubator-inlong] branch master updated: [INLONG-2793][Sort] Fix bugs related to hive sink (#2795)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e811097  [INLONG-2793][Sort] Fix bugs related to hive sink (#2795)
e811097 is described below

commit e811097560b67f488c4b952fe30fb8bf1001c712
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Tue Mar 1 12:39:16 2022 +0800

    [INLONG-2793][Sort] Fix bugs related to hive sink (#2795)
---
 .../java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java   | 2 +-
 .../main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java | 8 +++++++-
 .../sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java  | 2 +-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index 83d74d0..6b01199 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -249,7 +249,7 @@ public class HiveSinkInfo extends SinkInfo {
                 @JsonProperty("splitter") Character splitter,
                 @JsonProperty("compression_type") CompressionType compressionType) {
             this.splitter = splitter;
-            this.compressionType = compressionType;
+            this.compressionType = compressionType == null ? CompressionType.NONE : compressionType;
         }
 
         public TextFileFormat(@JsonProperty("splitter") Character splitter) {
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
index a04ed16..1aa1f86 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
@@ -89,7 +89,13 @@ public class Bucket<IN, BucketID> {
         this.fsWriter = checkNotNull(fsWriter);
         this.subtaskIndex = subtaskIndex;
         this.bucketId = checkNotNull(bucketId);
-        this.bucketPath = checkNotNull(bucketPath);
+
+        String bucketPathStr = checkNotNull(bucketPath).toString();
+        if (!bucketPathStr.endsWith("/")) {
+            bucketPathStr += "/";
+        }
+        this.bucketPath = new Path(bucketPathStr);
+
         this.partCounter = initialPartCounter;
         this.partFileFactory = checkNotNull(partFileFactory);
         this.rollingPolicy = checkNotNull(rollingPolicy);
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
index 408c063..378f16c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
@@ -61,7 +61,7 @@ public class PulsarTestMetaManagerUtil extends TestMetaManagerUtil {
         };
 
         Map<String, Object> config = new HashMap<>();
-        config.put("pulsar.source.consumer.bootstrap-mode", "earliest");
+        config.put("consumer.bootstrap-mode", "earliest");
 
         return new DataFlowInfo(
                 dataFlowId,