You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/01 04:39:24 UTC
[incubator-inlong] branch master updated: [INLONG-2793][Sort] Fix bugs related to hive sink (#2795)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e811097 [INLONG-2793][Sort] Fix bugs related to hive sink (#2795)
e811097 is described below
commit e811097560b67f488c4b952fe30fb8bf1001c712
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Tue Mar 1 12:39:16 2022 +0800
[INLONG-2793][Sort] Fix bugs related to hive sink (#2795)
---
.../java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java | 2 +-
.../main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java | 8 +++++++-
.../sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java | 2 +-
3 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index 83d74d0..6b01199 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -249,7 +249,7 @@ public class HiveSinkInfo extends SinkInfo {
@JsonProperty("splitter") Character splitter,
@JsonProperty("compression_type") CompressionType compressionType) {
this.splitter = splitter;
- this.compressionType = compressionType;
+ this.compressionType = compressionType == null ? CompressionType.NONE : compressionType;
}
public TextFileFormat(@JsonProperty("splitter") Character splitter) {
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
index a04ed16..1aa1f86 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/filesystem/Bucket.java
@@ -89,7 +89,13 @@ public class Bucket<IN, BucketID> {
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
- this.bucketPath = checkNotNull(bucketPath);
+
+ String bucketPathStr = checkNotNull(bucketPath).toString();
+ if (!bucketPathStr.endsWith("/")) {
+ bucketPathStr += "/";
+ }
+ this.bucketPath = new Path(bucketPathStr);
+
this.partCounter = initialPartCounter;
this.partFileFactory = checkNotNull(partFileFactory);
this.rollingPolicy = checkNotNull(rollingPolicy);
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
index 408c063..378f16c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/multitenant/pulsar/PulsarTestMetaManagerUtil.java
@@ -61,7 +61,7 @@ public class PulsarTestMetaManagerUtil extends TestMetaManagerUtil {
};
Map<String, Object> config = new HashMap<>();
- config.put("pulsar.source.consumer.bootstrap-mode", "earliest");
+ config.put("consumer.bootstrap-mode", "earliest");
return new DataFlowInfo(
dataFlowId,