You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/16 12:36:18 UTC

[incubator-inlong] branch master updated: [INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9402610  [INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)
9402610 is described below

commit 940261088cdfb8ae086c2042cb42327d3404bf4c
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 16 20:36:12 2022 +0800

    [INLONG-3166][Manager] Replace hdfsDefaultFs and warehouseDir of Hive sink with dataPath field (#3170)
---
 .../inlong/manager/client/api/sink/HiveSink.java      |  7 ++-----
 .../client/api/util/InlongStreamSinkTransfer.java     |  9 +++------
 .../manager/common/pojo/sink/hive/HiveSinkDTO.java    | 12 ++++--------
 .../common/pojo/sink/hive/HiveSinkListResponse.java   |  4 ++--
 .../common/pojo/sink/hive/HiveSinkRequest.java        |  7 ++-----
 .../common/pojo/sink/hive/HiveSinkResponse.java       | 15 ++++++---------
 .../service/thirdparty/sort/util/SinkInfoUtils.java   | 19 +++++--------------
 .../service/thirdparty/sort/DisableZkForSortTest.java | 12 ++++++------
 8 files changed, 30 insertions(+), 55 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index 16fdbe2..34e395c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -57,11 +57,8 @@ public class HiveSink extends StreamSink {
     @ApiModelProperty("Target table name")
     private String tableName;
 
-    @ApiModelProperty("HDFS defaultFS, etc hdfs://${ip}:${port}")
-    private String hdfsDefaultFs;
-
-    @ApiModelProperty("Warehouse directory, etc /usr/hive/warehouse")
-    private String warehouseDir;
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+    private String dataPath;
 
     @ApiModelProperty("Data encoding format: UTF-8, GBK")
     private Charset charset = StandardCharsets.UTF_8;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 9456cc1..28a138c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -211,9 +211,8 @@ public class InlongStreamSinkTransfer {
         hiveSinkRequest.setDataSeparator(String.valueOf(hiveSink.getDataSeparator().getAsciiCode()));
         hiveSinkRequest.setDbName(hiveSink.getDbName());
         hiveSinkRequest.setTableName(hiveSink.getTableName());
-        hiveSinkRequest.setHdfsDefaultFs(hiveSink.getHdfsDefaultFs());
+        hiveSinkRequest.setDataPath(hiveSink.getDataPath());
         hiveSinkRequest.setJdbcUrl(hiveSink.getJdbcUrl());
-        hiveSinkRequest.setWarehouseDir(hiveSink.getWarehouseDir());
         hiveSinkRequest.setFileFormat(hiveSink.getFileFormat().name());
         hiveSinkRequest.setSinkType(hiveSink.getSinkType().name());
         DefaultAuthentication defaultAuthentication = hiveSink.getAuthentication();
@@ -256,12 +255,11 @@ public class InlongStreamSinkTransfer {
             hiveSink.setDataSeparator(snapshot.getDataSeparator());
             hiveSink.setCharset(snapshot.getCharset());
             hiveSink.setAuthentication(snapshot.getAuthentication());
-            hiveSink.setWarehouseDir(snapshot.getWarehouseDir());
             hiveSink.setFileFormat(snapshot.getFileFormat());
             hiveSink.setJdbcUrl(snapshot.getJdbcUrl());
             hiveSink.setTableName(snapshot.getTableName());
             hiveSink.setDbName(snapshot.getDbName());
-            hiveSink.setHdfsDefaultFs(snapshot.getHdfsDefaultFs());
+            hiveSink.setDataPath(snapshot.getDataPath());
             hiveSink.setSecondaryPartition(snapshot.getSecondaryPartition());
             hiveSink.setPrimaryPartition(snapshot.getPrimaryPartition());
         } else {
@@ -271,12 +269,11 @@ public class InlongStreamSinkTransfer {
             String password = sinkResponse.getPassword();
             String uname = sinkResponse.getUsername();
             hiveSink.setAuthentication(new DefaultAuthentication(uname, password));
-            hiveSink.setWarehouseDir(sinkResponse.getWarehouseDir());
             hiveSink.setFileFormat(FileFormat.forName(sinkResponse.getFileFormat()));
             hiveSink.setJdbcUrl(sinkResponse.getJdbcUrl());
             hiveSink.setTableName(sinkResponse.getTableName());
             hiveSink.setDbName(sinkResponse.getDbName());
-            hiveSink.setHdfsDefaultFs(sinkResponse.getHdfsDefaultFs());
+            hiveSink.setDataPath(sinkResponse.getDataPath());
             hiveSink.setSecondaryPartition(sinkResponse.getSecondaryPartition());
             hiveSink.setPrimaryPartition(sinkResponse.getPrimaryPartition());
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index 65f3ea8..d1293d3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.common.pojo.sink.hive;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -29,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * Hive sink info
@@ -56,11 +56,8 @@ public class HiveSinkDTO {
     @ApiModelProperty("Target table name")
     private String tableName;
 
-    @ApiModelProperty("HDFS defaultFS")
-    private String hdfsDefaultFs;
-
-    @ApiModelProperty("Warehouse directory")
-    private String warehouseDir;
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
 
     @ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
     private Integer partitionInterval;
@@ -99,8 +96,7 @@ public class HiveSinkDTO {
                 .password(request.getPassword())
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
-                .hdfsDefaultFs(request.getHdfsDefaultFs())
-                .warehouseDir(request.getWarehouseDir())
+                .dataPath(request.getDataPath())
                 .partitionInterval(request.getPartitionInterval())
                 .partitionUnit(request.getPartitionUnit())
                 .primaryPartition(request.getPrimaryPartition())
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
index 66fee76..a6c98c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkListResponse.java
@@ -43,8 +43,8 @@ public class HiveSinkListResponse extends SinkListResponse {
     @ApiModelProperty("JDBC URL")
     private String jdbcUrl;
 
-    @ApiModelProperty("HDFS defaultFS")
-    private String hdfsDefaultFs;
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
 
     @ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
     private String partitionType;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
index 581f780..54dbf64 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
@@ -54,11 +54,8 @@ public class HiveSinkRequest extends SinkRequest {
     @ApiModelProperty("Target table name")
     private String tableName;
 
-    @ApiModelProperty("HDFS defaultFS")
-    private String hdfsDefaultFs;
-
-    @ApiModelProperty("Warehouse directory")
-    private String warehouseDir;
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
 
     @ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
     private Integer partitionInterval;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
index 0195d0e..135ba95 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
@@ -34,10 +34,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 @ApiModel(value = "Response of the Hive sink")
 public class HiveSinkResponse extends SinkResponse {
 
-    public HiveSinkResponse() {
-        this.sinkType = Constant.SINK_HIVE;
-    }
-
     @ApiModelProperty("Hive JDBC URL")
     private String jdbcUrl;
 
@@ -53,11 +49,8 @@ public class HiveSinkResponse extends SinkResponse {
     @ApiModelProperty("Target table name")
     private String tableName;
 
-    @ApiModelProperty("HDFS defaultFS")
-    private String hdfsDefaultFs;
-
-    @ApiModelProperty("Warehouse directory")
-    private String warehouseDir;
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
 
     @ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
     private Integer partitionInterval;
@@ -83,4 +76,8 @@ public class HiveSinkResponse extends SinkResponse {
     @ApiModelProperty("Data field separator")
     private String dataSeparator;
 
+    public HiveSinkResponse() {
+        this.sinkType = Constant.SINK_HIVE;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index f2212e0..f92f3be 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -162,22 +162,13 @@ public class SinkInfoUtils {
             partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
         }
 
-        // dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
+        // dataPath = dataPath + / + tableName
         StringBuilder dataPathBuilder = new StringBuilder();
-        String hdfsUrl = hiveInfo.getHdfsDefaultFs();
-        String warehouseDir = hiveInfo.getWarehouseDir();
-        if (hdfsUrl.endsWith("/")) {
-            dataPathBuilder.append(hdfsUrl, 0, hdfsUrl.length() - 1);
-        } else {
-            dataPathBuilder.append(hdfsUrl);
-        }
-        if (warehouseDir.endsWith("/")) {
-            dataPathBuilder.append(warehouseDir, 0, warehouseDir.length() - 1);
-        } else {
-            dataPathBuilder.append(warehouseDir);
+        String dataPath = hiveInfo.getDataPath();
+        if (!dataPath.endsWith("/")) {
+            dataPathBuilder.append(dataPath).append("/");
         }
-        String dataPath = dataPathBuilder.append("/").append(hiveInfo.getDbName())
-                .append(".db/").append(hiveInfo.getTableName()).toString();
+        dataPath = dataPathBuilder.append(hiveInfo.getTableName()).toString();
 
         return new HiveSinkInfo(sinkFields.toArray(new FieldInfo[0]), hiveInfo.getJdbcUrl(),
                 hiveInfo.getDbName(), hiveInfo.getTableName(), hiveInfo.getUsername(), hiveInfo.getPassword(),
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
index 98abd11..cdf2ffd 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
@@ -18,9 +18,6 @@
 package org.apache.inlong.manager.service.thirdparty.sort;
 
 import com.google.common.collect.Lists;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -50,6 +47,10 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+
 public class DisableZkForSortTest extends WorkflowServiceImplTest {
 
 
@@ -85,10 +86,9 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
         hiveSinkRequest.setPassword("password");
         hiveSinkRequest.setDbName("default");
         hiveSinkRequest.setTableName("kip_test");
-        hiveSinkRequest.setJdbcUrl("jdbc:hive2://172.17.12.135:7001");
+        hiveSinkRequest.setJdbcUrl("jdbc:hive2://localhost:7001");
         hiveSinkRequest.setFileFormat("TextFile");
-        hiveSinkRequest.setHdfsDefaultFs("hdfs://172.17.12.235:4007");
-        hiveSinkRequest.setWarehouseDir("/user/hive/warehouse");
+        hiveSinkRequest.setDataPath("hdfs://localhost:4007/user/hive/warehouse/default");
         hiveSinkRequest.setFileFormat(StandardCharsets.UTF_8.name());
         hiveSinkRequest.setDataSeparator("124");
         streamSinkService.save(hiveSinkRequest, OPERATOR);