You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/17 06:25:12 UTC

[inlong] branch master updated: [INLONG-5380][Manager] Modify the saving function of the data node (#5381)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dd3cf6ecf [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
dd3cf6ecf is described below

commit dd3cf6ecff4f655e629a6bd2d6ea2e56f385d3cb
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Aug 17 14:25:07 2022 +0800

    [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
---
 .../client/api/inner/ClientFactoryTest.java        | 17 ++--
 .../inlong/manager/common/enums/ErrorCodeEnum.java |  3 +
 .../inlong/manager/common/util/HttpUtils.java      | 26 ++++--
 .../{DataNodeRequest.java => DataNodeInfo.java}    | 44 +++++-----
 .../manager/pojo/node/DataNodePageRequest.java     |  4 +-
 .../inlong/manager/pojo/node/DataNodeRequest.java  | 16 ++--
 .../manager/pojo/node/hive/HiveDataNodeDTO.java    | 94 ++++++++++++++++++++++
 .../manager/pojo/node/hive/HiveDataNodeInfo.java   | 66 +++++++++++++++
 .../pojo/node/hive/HiveDataNodeRequest.java        | 61 ++++++++++++++
 .../service/node/AbstractDataNodeOperator.java     | 78 ++++++++++++++++++
 .../manager/service/node/DataNodeOperator.java     | 65 +++++++++++++++
 .../service/node/DataNodeOperatorFactory.java      | 46 +++++++++++
 .../service/{core => node}/DataNodeService.java    |  8 +-
 .../{core/impl => node}/DataNodeServiceImpl.java   | 50 ++++++------
 .../service/node/hive/HiveDataNodeOperator.java    | 87 ++++++++++++++++++++
 .../manager/service/sink/AbstractSinkOperator.java | 21 ++++-
 .../manager/service/sink/StreamSinkOperator.java   |  7 ++
 .../service/sink/StreamSinkServiceImpl.java        | 33 +++-----
 .../service/core/impl/DataNodeServiceTest.java     | 29 ++++---
 .../manager/web/controller/DataNodeController.java | 10 +--
 .../web/controller/DataNodeControllerTest.java     | 36 +++++----
 21 files changed, 667 insertions(+), 134 deletions(-)

diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 203fd10ae..26910e33b 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -62,8 +62,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -965,9 +965,8 @@ class ClientFactoryTest {
                                         Response.success(1))
                                 ))
         );
-        DataNodeRequest request = new DataNodeRequest();
-        request.setName("test_node");
-        request.setType(DataNodeType.HIVE);
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
+        request.setName("test_hive_node");
         Integer nodeId = dataNodeClient.save(request);
         Assertions.assertEquals(1, nodeId);
     }
@@ -1007,9 +1006,8 @@ class ClientFactoryTest {
                         )
         );
 
-        DataNodeRequest request = new DataNodeRequest();
-        request.setName("test_node");
-        request.setToken(DataNodeType.HIVE);
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
+        request.setName("test_hive_node");
         PageInfo<DataNodeResponse> nodePageInfo = dataNodeClient.list(request);
         Assertions.assertEquals(JsonUtils.toJsonString(nodePageInfo.getList()), JsonUtils.toJsonString(nodeResponses));
     }
@@ -1025,10 +1023,9 @@ class ClientFactoryTest {
                         )
         );
 
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setId(1);
-        request.setName("test_node");
-        request.setType(DataNodeType.HIVE);
+        request.setName("test_hive_node");
         Boolean isUpdate = dataNodeClient.update(request);
         Assertions.assertTrue(isUpdate);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ce24b12ee..d6cc20776 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -60,6 +60,9 @@ public enum ErrorCodeEnum {
     CLUSTER_TYPE_NOT_SUPPORTED(1102, "Cluster type '%s' not supported"),
     CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
 
+    DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
+    DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
+
     STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"),
     STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"),
     STREAM_OPT_NOT_ALLOWED(1203,
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 13b6bbad8..09c72464a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -98,7 +98,7 @@ public class HttpUtils {
     /**
      * Send an HTTP request
      */
-    public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
+    public static <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
             HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
         if (log.isDebugEnabled()) {
             log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
@@ -112,17 +112,31 @@ public class HttpUtils {
         return response.getBody();
     }
 
-    public <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
+    /**
+     * Send GET request to the specified URL.
+     */
+    public static <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params,
+            HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
+        return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+    }
+
+    /**
+     * Send PUT request to the specified URL.
+     */
+    public static <T> T putRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
             ParameterizedTypeReference<T> typeReference) {
-        return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
+        return request(restTemplate, url, HttpMethod.PUT, params, header, typeReference);
     }
 
-    public <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, HttpHeaders header,
+    /**
+     * Send POST request to the specified URL.
+     */
+    public static <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
             ParameterizedTypeReference<T> typeReference) {
-        return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+        return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
     }
 
-    private String buildUrlWithQueryParam(String url, Map<String, Object> params) {
+    private static String buildUrlWithQueryParam(String url, Map<String, Object> params) {
         if (params == null) {
             return url;
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
index 3db00c792..d17569e14 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
@@ -17,46 +17,42 @@
 
 package org.apache.inlong.manager.pojo.node;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
 
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
+import java.util.Date;
 
 /**
- * Data node request
+ * Data node info
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Data node  request")
-public class DataNodeRequest {
+@ApiModel("Data node info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeInfo {
 
-    @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
-    @NotBlank(message = "node name cannot be blank")
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
-    @NotBlank(message = "node type cannot be blank")
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node url")
+    @ApiModelProperty(value = "Data node URL")
     private String url;
 
-    @ApiModelProperty(value = "Node username")
+    @ApiModelProperty("Data node username")
     private String username;
 
-    @ApiModelProperty(value = "Node token if needed")
+    @ApiModelProperty(value = "Data node token if needed")
     private String token;
 
     @ApiModelProperty(value = "Extended params")
@@ -65,11 +61,23 @@ public class DataNodeRequest {
     @ApiModelProperty(value = "Description of the data node")
     private String description;
 
-    @NotBlank(message = "inCharges cannot be blank")
-    @ApiModelProperty(value = "Name of responsible person, separated by commas", required = true)
+    @ApiModelProperty(value = "Name of in charges, separated by commas")
     private String inCharges;
 
+    @ApiModelProperty(value = "Name of in creator")
+    private String creator;
+
+    @ApiModelProperty(value = "Name of in modifier")
+    private String modifier;
+
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
+    public abstract DataNodeRequest genRequest();
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
index 6f3862776..496aa9c9c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
@@ -31,10 +31,10 @@ import org.apache.inlong.manager.pojo.common.PageRequest;
 @ApiModel("Data node paging query request")
 public class DataNodePageRequest extends PageRequest {
 
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
     @ApiModelProperty(value = "Keywords, name, url, etc.")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 3db00c792..54504e2dc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.pojo.node;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -32,31 +32,31 @@ import javax.validation.constraints.NotNull;
  * Data node request
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Data node  request")
-public class DataNodeRequest {
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeRequest {
 
     @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
     @NotBlank(message = "node name cannot be blank")
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
     @NotBlank(message = "node type cannot be blank")
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node url")
+    @ApiModelProperty(value = "Data node URL")
     private String url;
 
-    @ApiModelProperty(value = "Node username")
+    @ApiModelProperty(value = "Data node username")
     private String username;
 
-    @ApiModelProperty(value = "Node token if needed")
+    @ApiModelProperty(value = "Data node token if needed")
     private String token;
 
     @ApiModelProperty(value = "Extended params")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
new file mode 100644
index 000000000..5922e61e4
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Hive data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Hive data node info")
+public class HiveDataNodeDTO {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class);
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) throws Exception {
+        return HiveDataNodeDTO.builder()
+                .jdbcUrl(request.getJdbcUrl())
+                .hiveVersion(request.getHiveVersion())
+                .hiveConfDir(request.getHiveConfDir())
+                .hdfsPath(request.getHdfsPath())
+                .warehouse(request.getWarehouse())
+                .hdfsUgi(request.getHdfsUgi())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+        } catch (Exception e) {
+            LOGGER.error("Failed to extract additional parameters for hive data node: ", e);
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
new file mode 100644
index 000000000..34b50ed06
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hive data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node info")
+public class HiveDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    public HiveDataNodeInfo() {
+        this.setType(DataNodeType.HIVE);
+    }
+
+    @Override
+    public HiveDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, HiveDataNodeRequest::new);
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
new file mode 100644
index 000000000..ab51ed666
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hive data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node request")
+public class HiveDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    public HiveDataNodeRequest() {
+        this.setType(DataNodeType.HIVE);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
new file mode 100644
index 000000000..d66ff1233
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operation of data node.
+ */
+public abstract class AbstractDataNodeOperator implements DataNodeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class);
+
+    @Autowired
+    protected DataNodeEntityMapper dataNodeEntityMapper;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Integer saveOpt(DataNodeRequest request, String operator) {
+        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        dataNodeEntityMapper.insert(entity);
+
+        return entity.getId();
+    }
+
+    /**
+     * Set the parameters of the target entity.
+     *
+     * @param request data node request
+     * @param targetEntity entity which will set the new parameters
+     */
+    protected abstract void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity);
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+    public void updateOpt(DataNodeRequest request, String operator) {
+        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setModifier(operator);
+        int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("data node has already updated with name={}, type={}, curVersion={}", request.getName(),
+                    request.getType(), request.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
new file mode 100644
index 000000000..ce579994e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Interface of the data node operator.
+ */
+public interface DataNodeOperator {
+
+    /**
+     * Determines whether the current instance matches the specified type.
+     */
+    Boolean accept(String dataNodeType);
+
+    /**
+     * Get the data node type.
+     *
+     * @return data node type string
+     */
+    String getDataNodeType();
+
+    /**
+     * Save the data node info.
+     *
+     * @param request request of the data node
+     * @param operator name of the operator
+     * @return data node id after saving
+     */
+    Integer saveOpt(DataNodeRequest request, String operator);
+
+    /**
+     * Get the data node info from the given entity.
+     *
+     * @param entity get field value from the entity
+     * @return cluster info after encapsulating
+     */
+    DataNodeInfo getFromEntity(DataNodeEntity entity);
+
+    /**
+     * Update the data node info.
+     *
+     * @param request request of update
+     * @param operator name of operator
+     */
+    void updateOpt(DataNodeRequest request, String operator);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
new file mode 100644
index 000000000..fba9d84d7
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link DataNodeOperator}.
+ */
+@Service
+public class DataNodeOperatorFactory {
+
+    @Autowired
+    private List<DataNodeOperator> dataNodeOperatorList;
+
+    /**
+     * Get a cluster operator instance via the given type
+     */
+    public DataNodeOperator getInstance(String type) {
+        return dataNodeOperatorList.stream()
+                .filter(inst -> inst.accept(type))
+                .findFirst()
+                .orElseThrow(() -> new BusinessException(
+                        String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type)));
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index 027fd1bf1..29422e81d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.node;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 
 /**
  * Data node service layer interface
@@ -42,7 +42,7 @@ public interface DataNodeService {
      * @param id node id
      * @return node info
      */
-    DataNodeResponse get(Integer id);
+    DataNodeInfo get(Integer id);
 
     /**
      * Paging query nodes according to conditions.
@@ -50,7 +50,7 @@ public interface DataNodeService {
      * @param request page request conditions
      * @return node list
      */
-    PageInfo<DataNodeResponse> list(DataNodePageRequest request);
+    PageInfo<DataNodeInfo> list(DataNodePageRequest request);
 
     /**
      * Update data node.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index e87edc293..893987663 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.node;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
@@ -24,14 +24,12 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
-import org.apache.inlong.manager.service.core.DataNodeService;
 import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +39,7 @@ import org.springframework.stereotype.Service;
 import java.sql.Connection;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Data node service layer implementation
@@ -52,6 +51,8 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Autowired
     private DataNodeEntityMapper dataNodeMapper;
+    @Autowired
+    private DataNodeOperatorFactory operatorFactory;
 
     @Override
     public Integer save(DataNodeRequest request, String operator) {
@@ -65,33 +66,38 @@ public class DataNodeServiceImpl implements DataNodeService {
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
-        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
-        entity.setCreator(operator);
-        entity.setModifier(operator);
-        dataNodeMapper.insert(entity);
-
+        // according to the data type, save sink information
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+        int id = dataNodeOperator.saveOpt(request, operator);
         LOGGER.debug("success to save data node={}", request);
-        return entity.getId();
+        return id;
     }
 
     @Override
-    public DataNodeResponse get(Integer id) {
+    public DataNodeInfo get(Integer id) {
         DataNodeEntity entity = dataNodeMapper.selectById(id);
         if (entity == null) {
             LOGGER.error("data node not found by id={}", id);
             throw new BusinessException("data node not found");
         }
-        DataNodeResponse response = CommonBeanUtils.copyProperties(entity, DataNodeResponse::new);
+
+        String dataNodeType = entity.getType();
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+        DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
         LOGGER.debug("success to get data node info by id={}", id);
-        return response;
+        return dataNodeInfo;
     }
 
     @Override
-    public PageInfo<DataNodeResponse> list(DataNodePageRequest request) {
+    public PageInfo<DataNodeInfo> list(DataNodePageRequest request) {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<DataNodeEntity> entityPage = (Page<DataNodeEntity>) dataNodeMapper.selectByCondition(request);
-        List<DataNodeResponse> responseList = CommonBeanUtils.copyListProperties(entityPage, DataNodeResponse::new);
-        PageInfo<DataNodeResponse> page = new PageInfo<>(responseList);
+        List<DataNodeInfo> list = entityPage.stream()
+                .map(entity -> {
+                    DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
+                    return dataNodeOperator.getFromEntity(entity);
+                }).collect(Collectors.toList());
+        PageInfo<DataNodeInfo> page = new PageInfo<>(list);
         page.setTotal(entityPage.getTotal());
         LOGGER.debug("success to list data node by {}", request);
         return page;
@@ -101,9 +107,8 @@ public class DataNodeServiceImpl implements DataNodeService {
     public Boolean update(DataNodeRequest request, String operator) {
         String name = request.getName();
         String type = request.getType();
-
-        Integer id = request.getId();
         DataNodeEntity exist = dataNodeMapper.selectByNameAndType(name, type);
+        Integer id = request.getId();
         if (exist != null && !Objects.equals(id, exist.getId())) {
             String errMsg = String.format("data node already exist for name=%s type=%s", name, type);
             LOGGER.error(errMsg);
@@ -121,13 +126,8 @@ public class DataNodeServiceImpl implements DataNodeService {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-        CommonBeanUtils.copyProperties(request, entity, true);
-        entity.setModifier(operator);
-        int rowCount = dataNodeMapper.updateById(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+        dataNodeOperator.updateOpt(request, operator);
         LOGGER.info("success to update data node={}", request);
         return true;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
new file mode 100644
index 000000000..7cb25bd5f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HiveDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.HIVE;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        HiveDataNodeInfo hiveDataNodeInfo = new HiveDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, hiveDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            HiveDataNodeDTO dto = HiveDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, hiveDataNodeInfo);
+        }
+
+        LOGGER.debug("success to get hive data node from entity");
+        return hiveDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
+        HiveDataNodeRequest hiveDataNodeRequest = (HiveDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(hiveDataNodeRequest, targetEntity, true);
+        try {
+            HiveDataNodeDTO dto = HiveDataNodeDTO.getFromRequest(hiveDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.debug("success to set entity for hive data node");
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for hive data node: ", e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index a771b2d32..2e790fff1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -26,15 +26,15 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sink.SinkField;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -186,6 +186,21 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         LOGGER.info("success to save sink fields");
     }
 
+    @Override
+    public void deleteOpt(StreamSinkEntity entity, String operator) {
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(InlongConstants.DELETED_STATUS);
+        entity.setIsDeleted(entity.getId());
+        entity.setModifier(operator);
+        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
+                    entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+        sinkFieldMapper.logicDeleteAll(entity.getId());
+    }
+
     /**
      * Check the validity of sink fields.
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 69e3b6c1c..e8231c419 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -89,4 +89,11 @@ public interface StreamSinkOperator {
      */
     void updateFieldOpt(Boolean onlyAdd, SinkRequest request);
 
+    /**
+     * Delete the sink info.
+     *
+     * @param entity sink info needs to delete
+     * @param operator name of the operator
+     */
+    void deleteOpt(StreamSinkEntity entity, String operator);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 311fd9329..f5860fb9e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -55,7 +55,6 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -106,13 +105,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
 
         // According to the sink type, save sink information
-        StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
         List<SinkField> fields = request.getSinkFieldList();
         // Remove id in sinkField when save
         if (CollectionUtils.isNotEmpty(fields)) {
             fields.forEach(sinkField -> sinkField.setId(null));
         }
-        int id = operation.saveOpt(request, operator);
+        int id = sinkOperator.saveOpt(request, operator);
 
         LOGGER.info("success to save sink info: {}", request);
         return id;
@@ -126,8 +125,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             LOGGER.error("sink not found by id={}", id);
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
         }
-        StreamSinkOperator operation = operatorFactory.getInstance(entity.getSinkType());
-        StreamSink streamSink = operation.getFromEntity(entity);
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+        StreamSink streamSink = sinkOperator.getFromEntity(entity);
         LOGGER.debug("success to get sink info by id={}", id);
         return streamSink;
     }
@@ -191,8 +190,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
         List<StreamSink> responseList = Lists.newArrayList();
         for (Map.Entry<String, Page<StreamSinkEntity>> entry : sinkMap.entrySet()) {
-            StreamSinkOperator operation = operatorFactory.getInstance(entry.getKey());
-            PageInfo<? extends StreamSink> pageInfo = operation.getPageInfo(entry.getValue());
+            StreamSinkOperator sinkOperator = operatorFactory.getInstance(entry.getKey());
+            PageInfo<? extends StreamSink> pageInfo = sinkOperator.getPageInfo(entry.getValue());
             responseList.addAll(pageInfo.getList());
         }
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
@@ -230,8 +229,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             fields.forEach(sinkField -> sinkField.setId(null));
         }
 
-        StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
-        operation.updateOpt(request, operator);
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
+        sinkOperator.updateOpt(request, operator);
 
         // The inlong group status is [Configuration successful], then asynchronously initiate
         // the [Single inlong stream resource creation] workflow
@@ -266,20 +265,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
-
-        entity.setPreviousStatus(entity.getStatus());
-        entity.setStatus(InlongConstants.DELETED_STATUS);
-        entity.setIsDeleted(id);
-        entity.setModifier(operator);
-        entity.setModifyTime(new Date());
-        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
-                    entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        sinkFieldMapper.logicDeleteAll(id);
-
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+        sinkOperator.deleteOpt(entity, operator);
         LOGGER.info("success to delete sink info: {}", entity);
         return true;
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
index a35aef416..41936336b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -39,7 +40,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
      * Save data node info.
      */
     public Integer saveOpt(String nodeName, String type, String url, String username, String password) {
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setName(nodeName);
         request.setType(type);
         request.setUrl(url);
@@ -47,13 +48,15 @@ public class DataNodeServiceTest extends ServiceBaseTest {
         request.setToken(password);
         request.setDescription("test cluster");
         request.setInCharges(GLOBAL_OPERATOR);
+        request.setJdbcUrl("127.0.0.1");
+        request.setToken("123456");
         return dataNodeService.save(request, GLOBAL_OPERATOR);
     }
 
     /**
      * Get data node list info.
      */
-    public PageInfo<DataNodeResponse> listOpt(String type, String name) {
+    public PageInfo<DataNodeInfo> listOpt(String type, String name) {
         DataNodePageRequest request = new DataNodePageRequest();
         request.setType(type);
         request.setName(name);
@@ -65,7 +68,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
      */
     public Boolean updateOpt(Integer id, String nodeName, String type, String url, String username, String password,
             Integer version) {
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setId(id);
         request.setName(nodeName);
         request.setType(type);
@@ -86,7 +89,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
     @Test
     public void testDataService() {
         String nodeName = "hiveNode1";
-        String type = "HIVE";
+        String type = DataNodeType.HIVE;
         String url = "127.0.0.1:8080";
         String usename = "admin";
         String password = "123";
@@ -96,17 +99,17 @@ public class DataNodeServiceTest extends ServiceBaseTest {
         Assertions.assertNotNull(id);
 
         // test get data node
-        DataNodeResponse nodeResponse = dataNodeService.get(id);
-        Assertions.assertNotNull(nodeResponse);
-        Assertions.assertEquals(type, nodeResponse.getType());
+        DataNodeInfo dataNodeInfo = dataNodeService.get(id);
+        Assertions.assertNotNull(dataNodeInfo);
+        Assertions.assertEquals(type, dataNodeInfo.getType());
 
         // test get data node list
-        PageInfo<DataNodeResponse> listDataNode = this.listOpt(type, nodeName);
+        PageInfo<DataNodeInfo> listDataNode = this.listOpt(type, nodeName);
         Assertions.assertEquals(listDataNode.getTotal(), 1);
 
         // test update data node
-        String newNodeName = "kafkaNode1";
-        String newType = "KAFKA";
+        String newNodeName = "hiveNode2";
+        String newType = DataNodeType.HIVE;
         String newUrl = "127.0.0.1:8083";
         String newUsername = "admin2";
         String newPassword = "456";
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
index a8c8a81bd..802181c00 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
@@ -22,13 +22,13 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.user.LoginUserUtils;
 import org.apache.shiro.authz.annotation.RequiresRoles;
@@ -65,13 +65,13 @@ public class DataNodeController {
     @GetMapping(value = "/node/get/{id}")
     @ApiOperation(value = "Get node by id")
     @ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
-    public Response<DataNodeResponse> get(@PathVariable Integer id) {
+    public Response<DataNodeInfo> get(@PathVariable Integer id) {
         return Response.success(dataNodeService.get(id));
     }
 
     @PostMapping(value = "/node/list")
     @ApiOperation(value = "List data node")
-    public Response<PageInfo<DataNodeResponse>> list(@RequestBody DataNodePageRequest request) {
+    public Response<PageInfo<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) {
         return Response.success(dataNodeService.list(request));
     }
 
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
index 99223394f..ba7b10353 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.web.controller;
 
-import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.web.WebBaseTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -36,15 +38,15 @@ class DataNodeControllerTest extends WebBaseTest {
     @Resource
     DataNodeEntityMapper dataNodeEntityMapper;
 
-    DataNodeRequest getDataNodeRequest() {
-        return DataNodeRequest.builder()
-                .name("hiveNode1")
-                .type("HIVE")
-                .url("127.0.0.1:8080")
-                .username("admin")
-                .token("123")
-                .inCharges("admin")
-                .build();
+    HiveDataNodeRequest getHiveDataNodeRequest() {
+        HiveDataNodeRequest hiveDataNodeRequest = new HiveDataNodeRequest();
+        hiveDataNodeRequest.setName("hiveNode1");
+        hiveDataNodeRequest.setType(DataNodeType.HIVE);
+        hiveDataNodeRequest.setUrl("127.0.0.1:8080");
+        hiveDataNodeRequest.setUsername("admin");
+        hiveDataNodeRequest.setToken("123");
+        hiveDataNodeRequest.setInCharges("admin");
+        return hiveDataNodeRequest;
     }
 
     @Test
@@ -52,7 +54,7 @@ class DataNodeControllerTest extends WebBaseTest {
         logout();
         operatorLogin();
 
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
 
         Response<Integer> response = getResBody(mvcResult, Integer.class);
         Assertions.assertEquals("Current user [operator] has no permission to access URL", response.getErrMsg());
@@ -61,7 +63,7 @@ class DataNodeControllerTest extends WebBaseTest {
     @Test
     void testSaveAndGetAndDelete() throws Exception {
         // save
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
 
         Integer dataNodeId = getResBodyObj(mvcResult, Integer.class);
         Assertions.assertNotNull(dataNodeId);
@@ -71,7 +73,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
         DataNodeResponse dataNode = getResBodyObj(getResult, DataNodeResponse.class);
         Assertions.assertNotNull(dataNode);
-        Assertions.assertEquals(getDataNodeRequest().getName(), dataNode.getName());
+        Assertions.assertEquals(getHiveDataNodeRequest().getName(), dataNode.getName());
 
         // delete
         MvcResult deleteResult = deleteForSuccessMvcResult("/api/node/delete/{id}", dataNodeId);
@@ -99,7 +101,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
         dataNodeEntityMapper.insert(nodeEntity);
 
-        DataNodeRequest request = getDataNodeRequest();
+        DataNodeRequest request = getHiveDataNodeRequest();
         request.setId(nodeEntity.getId());
         request.setName("test447777");
         request.setVersion(nodeEntity.getVersion());
@@ -114,7 +116,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
     @Test
     void testUpdateFailByNoId() throws Exception {
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getHiveDataNodeRequest());
 
         Response<Boolean> response = getResBody(mvcResult, Boolean.class);
         Assertions.assertFalse(response.isSuccess());