You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/16 12:36:18 UTC
[incubator-inlong] branch master updated: [INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9402610 [INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)
9402610 is described below
commit 940261088cdfb8ae086c2042cb42327d3404bf4c
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 16 20:36:12 2022 +0800
[INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)
---
.../inlong/manager/client/api/sink/HiveSink.java | 7 ++-----
.../client/api/util/InlongStreamSinkTransfer.java | 9 +++------
.../manager/common/pojo/sink/hive/HiveSinkDTO.java | 12 ++++--------
.../common/pojo/sink/hive/HiveSinkListResponse.java | 4 ++--
.../common/pojo/sink/hive/HiveSinkRequest.java | 7 ++-----
.../common/pojo/sink/hive/HiveSinkResponse.java | 15 ++++++---------
.../service/thirdparty/sort/util/SinkInfoUtils.java | 19 +++++--------------
.../service/thirdparty/sort/DisableZkForSortTest.java | 12 ++++++------
8 files changed, 30 insertions(+), 55 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index 16fdbe2..34e395c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -57,11 +57,8 @@ public class HiveSink extends StreamSink {
@ApiModelProperty("Target table name")
private String tableName;
- @ApiModelProperty("HDFS defaultFS, etc hdfs://${ip}:${port}")
- private String hdfsDefaultFs;
-
- @ApiModelProperty("Warehouse directory, etc /usr/hive/warehouse")
- private String warehouseDir;
+ @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+ private String dataPath;
@ApiModelProperty("Data encoding format: UTF-8, GBK")
private Charset charset = StandardCharsets.UTF_8;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 9456cc1..28a138c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -211,9 +211,8 @@ public class InlongStreamSinkTransfer {
hiveSinkRequest.setDataSeparator(String.valueOf(hiveSink.getDataSeparator().getAsciiCode()));
hiveSinkRequest.setDbName(hiveSink.getDbName());
hiveSinkRequest.setTableName(hiveSink.getTableName());
- hiveSinkRequest.setHdfsDefaultFs(hiveSink.getHdfsDefaultFs());
+ hiveSinkRequest.setDataPath(hiveSink.getDataPath());
hiveSinkRequest.setJdbcUrl(hiveSink.getJdbcUrl());
- hiveSinkRequest.setWarehouseDir(hiveSink.getWarehouseDir());
hiveSinkRequest.setFileFormat(hiveSink.getFileFormat().name());
hiveSinkRequest.setSinkType(hiveSink.getSinkType().name());
DefaultAuthentication defaultAuthentication = hiveSink.getAuthentication();
@@ -256,12 +255,11 @@ public class InlongStreamSinkTransfer {
hiveSink.setDataSeparator(snapshot.getDataSeparator());
hiveSink.setCharset(snapshot.getCharset());
hiveSink.setAuthentication(snapshot.getAuthentication());
- hiveSink.setWarehouseDir(snapshot.getWarehouseDir());
hiveSink.setFileFormat(snapshot.getFileFormat());
hiveSink.setJdbcUrl(snapshot.getJdbcUrl());
hiveSink.setTableName(snapshot.getTableName());
hiveSink.setDbName(snapshot.getDbName());
- hiveSink.setHdfsDefaultFs(snapshot.getHdfsDefaultFs());
+ hiveSink.setDataPath(snapshot.getDataPath());
hiveSink.setSecondaryPartition(snapshot.getSecondaryPartition());
hiveSink.setPrimaryPartition(snapshot.getPrimaryPartition());
} else {
@@ -271,12 +269,11 @@ public class InlongStreamSinkTransfer {
String password = sinkResponse.getPassword();
String uname = sinkResponse.getUsername();
hiveSink.setAuthentication(new DefaultAuthentication(uname, password));
- hiveSink.setWarehouseDir(sinkResponse.getWarehouseDir());
hiveSink.setFileFormat(FileFormat.forName(sinkResponse.getFileFormat()));
hiveSink.setJdbcUrl(sinkResponse.getJdbcUrl());
hiveSink.setTableName(sinkResponse.getTableName());
hiveSink.setDbName(sinkResponse.getDbName());
- hiveSink.setHdfsDefaultFs(sinkResponse.getHdfsDefaultFs());
+ hiveSink.setDataPath(sinkResponse.getDataPath());
hiveSink.setSecondaryPartition(sinkResponse.getSecondaryPartition());
hiveSink.setPrimaryPartition(sinkResponse.getPrimaryPartition());
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index 65f3ea8..d1293d3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.common.pojo.sink.hive;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -29,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* Hive sink info
@@ -56,11 +56,8 @@ public class HiveSinkDTO {
@ApiModelProperty("Target table name")
private String tableName;
- @ApiModelProperty("HDFS defaultFS")
- private String hdfsDefaultFs;
-
- @ApiModelProperty("Warehouse directory")
- private String warehouseDir;
+ @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
@ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
private Integer partitionInterval;
@@ -99,8 +96,7 @@ public class HiveSinkDTO {
.password(request.getPassword())
.dbName(request.getDbName())
.tableName(request.getTableName())
- .hdfsDefaultFs(request.getHdfsDefaultFs())
- .warehouseDir(request.getWarehouseDir())
+ .dataPath(request.getDataPath())
.partitionInterval(request.getPartitionInterval())
.partitionUnit(request.getPartitionUnit())
.primaryPartition(request.getPrimaryPartition())
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
index 66fee76..a6c98c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
@@ -43,8 +43,8 @@ public class HiveSinkListResponse extends SinkListResponse {
@ApiModelProperty("JDBC URL")
private String jdbcUrl;
- @ApiModelProperty("HDFS defaultFS")
- private String hdfsDefaultFs;
+ @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
@ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
private String partitionType;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
index 581f780..54dbf64 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
@@ -54,11 +54,8 @@ public class HiveSinkRequest extends SinkRequest {
@ApiModelProperty("Target table name")
private String tableName;
- @ApiModelProperty("HDFS defaultFS")
- private String hdfsDefaultFs;
-
- @ApiModelProperty("Warehouse directory")
- private String warehouseDir;
+ @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
@ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
private Integer partitionInterval;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
index 0195d0e..135ba95 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
@@ -34,10 +34,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
@ApiModel(value = "Response of the Hive sink")
public class HiveSinkResponse extends SinkResponse {
- public HiveSinkResponse() {
- this.sinkType = Constant.SINK_HIVE;
- }
-
@ApiModelProperty("Hive JDBC URL")
private String jdbcUrl;
@@ -53,11 +49,8 @@ public class HiveSinkResponse extends SinkResponse {
@ApiModelProperty("Target table name")
private String tableName;
- @ApiModelProperty("HDFS defaultFS")
- private String hdfsDefaultFs;
-
- @ApiModelProperty("Warehouse directory")
- private String warehouseDir;
+ @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
@ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
private Integer partitionInterval;
@@ -83,4 +76,8 @@ public class HiveSinkResponse extends SinkResponse {
@ApiModelProperty("Data field separator")
private String dataSeparator;
+ public HiveSinkResponse() {
+ this.sinkType = Constant.SINK_HIVE;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index f2212e0..f92f3be 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -162,22 +162,13 @@ public class SinkInfoUtils {
partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
}
- // dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
+ // dataPath = dataPath + / + tableName
StringBuilder dataPathBuilder = new StringBuilder();
- String hdfsUrl = hiveInfo.getHdfsDefaultFs();
- String warehouseDir = hiveInfo.getWarehouseDir();
- if (hdfsUrl.endsWith("/")) {
- dataPathBuilder.append(hdfsUrl, 0, hdfsUrl.length() - 1);
- } else {
- dataPathBuilder.append(hdfsUrl);
- }
- if (warehouseDir.endsWith("/")) {
- dataPathBuilder.append(warehouseDir, 0, warehouseDir.length() - 1);
- } else {
- dataPathBuilder.append(warehouseDir);
+ String dataPath = hiveInfo.getDataPath();
+ if (!dataPath.endsWith("/")) {
+ dataPathBuilder.append(dataPath).append("/");
}
- String dataPath = dataPathBuilder.append("/").append(hiveInfo.getDbName())
- .append(".db/").append(hiveInfo.getTableName()).toString();
+ dataPath = dataPathBuilder.append(hiveInfo.getTableName()).toString();
return new HiveSinkInfo(sinkFields.toArray(new FieldInfo[0]), hiveInfo.getJdbcUrl(),
hiveInfo.getDbName(), hiveInfo.getTableName(), hiveInfo.getUsername(), hiveInfo.getPassword(),
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
index 98abd11..cdf2ffd 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
@@ -18,9 +18,6 @@
package org.apache.inlong.manager.service.thirdparty.sort;
import com.google.common.collect.Lists;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -50,6 +47,10 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class DisableZkForSortTest extends WorkflowServiceImplTest {
@@ -85,10 +86,9 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
hiveSinkRequest.setPassword("password");
hiveSinkRequest.setDbName("default");
hiveSinkRequest.setTableName("kip_test");
- hiveSinkRequest.setJdbcUrl("jdbc:hive2://172.17.12.135:7001");
+ hiveSinkRequest.setJdbcUrl("jdbc:hive2://localhost:7001");
hiveSinkRequest.setFileFormat("TextFile");
- hiveSinkRequest.setHdfsDefaultFs("hdfs://172.17.12.235:4007");
- hiveSinkRequest.setWarehouseDir("/user/hive/warehouse");
+ hiveSinkRequest.setDataPath("hdfs://localhost:4007/user/hive/warehouse/default");
hiveSinkRequest.setFileFormat(StandardCharsets.UTF_8.name());
hiveSinkRequest.setDataSeparator("124");
streamSinkService.save(hiveSinkRequest, OPERATOR);