You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/17 06:33:13 UTC

[inlong] branch master updated (dd3cf6ecf -> 65674c18a)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


    omit dd3cf6ecf [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
    omit cbfe2f11b [INLONG-5461][Sort] Add Audit for mongoDB extract node (#5548)
     new 4de795b33 [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)
     new 65674c18a [INLONG-5380][Manager] Modify the saving function of the data node (#5381)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (dd3cf6ecf)
            \
             N -- N -- N   refs/heads/master (65674c18a)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[inlong] 01/02: [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4de795b3303ca146a7c7a56fd30ba84992573440
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Wed Aug 17 10:54:28 2022 +0800

    [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)
---
 inlong-sort/sort-connectors/mongodb-cdc/pom.xml    | 11 +++++
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   | 22 +++++++---
 .../inlong/sort/cdc/mongodb/MongoDBSource.java     | 14 ++++--
 .../sort/cdc/mongodb/table/MongoDBTableSource.java | 22 ++++++----
 .../mongodb/table/MongoDBTableSourceFactory.java   | 51 +++++++++++-----------
 5 files changed, 78 insertions(+), 42 deletions(-)

diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index f4d3e6036..077595230 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -41,6 +41,11 @@
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -98,6 +103,12 @@
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    <pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        org.apache.inlong.sort.cdc.mongodb.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     <shadedPattern>
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index db41472ed..450990eb4 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -68,7 +69,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,8 +79,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -222,6 +227,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private String inlongMetric;
 
+    private String inlongAudit;
+
     private SourceMetricData metricData;
 
     // ---------------------------------------------------------------------------------------
@@ -230,12 +237,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator, String inlongMetric) {
+            Validator validator, String inlongMetric, String inlongAudit) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -414,7 +422,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+            AuditImp auditImp = null;
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                auditImp = AuditImp.getInstance();
+            }
+            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
             metricData.registerMetricsForNumRecordsIn();
             metricData.registerMetricsForNumBytesIn();
             metricData.registerMetricsForNumBytesInPerSecond();
@@ -458,9 +471,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.getNumRecordsIn().inc(1L);
-                                    metricData.getNumBytesIn()
-                                            .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetrics(1L,
+                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index ebe9bf4b5..dad8581d7 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -139,7 +139,8 @@ public class MongoDBSource {
         private String errorsTolerance;
         private Integer heartbeatIntervalMillis;
         private DebeziumDeserializationSchema<T> deserializer;
-        private String inLongMetric;
+        private String inlongMetric;
+        private String inlongAudit;
 
         /** The comma-separated list of hostname and port pairs of mongodb servers. */
         public Builder<T> hosts(String hosts) {
@@ -317,8 +318,13 @@ public class MongoDBSource {
             return this;
         }
 
-        public Builder<T> inLongMetric(String inLongMetric) {
-            this.inLongMetric = inLongMetric;
+        public Builder<T> inlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public Builder<T> inlongAudit(String inlongAudit) {
+            this.inlongAudit = inlongAudit;
             return this;
         }
 
@@ -441,7 +447,7 @@ public class MongoDBSource {
                     Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, Validator.getDefaultValidator(), inLongMetric);
+                    deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index e037ef55a..612f8b48d 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -76,7 +76,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
     private final Integer heartbeatIntervalMillis;
     private final ZoneId localTimeZone;
 
-    private final String inLongMetric;
+    private final String inlongMetric;
+    private final String inlongAudit;
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -106,7 +107,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
             @Nullable Integer pollAwaitTimeMillis,
             @Nullable Integer heartbeatIntervalMillis,
             ZoneId localTimeZone,
-            String inLongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this.physicalSchema = physicalSchema;
         this.hosts = checkNotNull(hosts);
         this.username = username;
@@ -126,7 +128,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         this.localTimeZone = localTimeZone;
         this.producedDataType = physicalSchema.toPhysicalRowDataType();
         this.metadataKeys = Collections.emptyList();
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -184,8 +187,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
         Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
         Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
-        Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric);
-
+        Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric);
+        Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit);
         DebeziumSourceFunction<RowData> sourceFunction = builder.build();
 
         return SourceFunctionProvider.of(sourceFunction, false);
@@ -243,7 +246,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                         pollAwaitTimeMillis,
                         heartbeatIntervalMillis,
                         localTimeZone,
-                        inLongMetric);
+                        inlongMetric,
+                        inlongAudit);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -277,7 +281,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 && Objects.equals(localTimeZone, that.localTimeZone)
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(metadataKeys, that.metadataKeys)
-                && Objects.equals(inLongMetric, that.inLongMetric);
+                && Objects.equals(inlongMetric, that.inlongMetric)
+                && Objects.equals(inlongAudit, that.inlongAudit);
     }
 
     @Override
@@ -302,7 +307,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 localTimeZone,
                 producedDataType,
                 metadataKeys,
-                inLongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index d7299cf19..d235bc860 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.time.ZoneId;
 import java.util.HashSet;
@@ -36,6 +37,8 @@ import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE
 import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
 import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Factory for creating configured instance of {@link MongoDBTableSource}.
@@ -46,12 +49,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
 
     private static final String DOCUMENT_ID_FIELD = "_id";
 
-    public static final ConfigOption<String> INLONG_METRIC =
-            ConfigOptions.key("inlong.metric")
-                    .stringType()
-                    .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
     private static final ConfigOption<String> HOSTS =
             ConfigOptions.key("hosts")
                     .stringType()
@@ -199,35 +196,37 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
 
         final ReadableConfig config = helper.getOptions();
 
-        String hosts = config.get(HOSTS);
-        String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
+        final String hosts = config.get(HOSTS);
+        final String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
 
-        String username = config.getOptional(USERNAME).orElse(null);
-        String password = config.getOptional(PASSWORD).orElse(null);
+        final String username = config.getOptional(USERNAME).orElse(null);
+        final String password = config.getOptional(PASSWORD).orElse(null);
 
-        String database = config.getOptional(DATABASE).orElse(null);
-        String collection = config.getOptional(COLLECTION).orElse(null);
+        final String database = config.getOptional(DATABASE).orElse(null);
+        final String collection = config.getOptional(COLLECTION).orElse(null);
 
-        String errorsTolerance = config.get(ERRORS_TOLERANCE);
-        Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
+        final String errorsTolerance = config.get(ERRORS_TOLERANCE);
+        final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
 
-        Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
-        Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
+        final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
+        final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
 
-        Integer heartbeatIntervalMillis =
+        final Integer heartbeatIntervalMillis =
                 config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
 
-        Boolean copyExisting = config.get(COPY_EXISTING);
-        String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
-        Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
-        Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
+        final Boolean copyExisting = config.get(COPY_EXISTING);
+        final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
+        final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
+        final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
 
-        String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
-        ZoneId localTimeZone =
+        final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
+        final ZoneId localTimeZone =
                 TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
                         ? ZoneId.systemDefault()
                         : ZoneId.of(zoneId);
-        String inLongMetric = config.get(INLONG_METRIC);
+        final String inlongMetric = config.get(INLONG_METRIC);
+        final String inlongAudit = config.get(INLONG_AUDIT);
+        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
@@ -251,7 +250,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 pollAwaitTimeMillis,
                 heartbeatIntervalMillis,
                 localTimeZone,
-                inLongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -290,6 +290,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         options.add(POLL_AWAIT_TIME_MILLIS);
         options.add(HEARTBEAT_INTERVAL_MILLIS);
         options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
 }


[inlong] 02/02: [INLONG-5380][Manager] Modify the saving function of the data node (#5381)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 65674c18a34db32c2824ea78260057ba2aef0240
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Aug 17 14:25:07 2022 +0800

    [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
    
    * Modify the saving function of the data node
    
    * Add params for Hive node
    
    * Fix the unit tests error
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../client/api/inner/ClientFactoryTest.java        | 17 ++--
 .../inlong/manager/common/enums/ErrorCodeEnum.java |  3 +
 .../inlong/manager/common/util/HttpUtils.java      | 26 ++++--
 .../{DataNodeRequest.java => DataNodeInfo.java}    | 44 +++++-----
 .../manager/pojo/node/DataNodePageRequest.java     |  4 +-
 .../inlong/manager/pojo/node/DataNodeRequest.java  | 16 ++--
 .../manager/pojo/node/hive/HiveDataNodeDTO.java    | 94 ++++++++++++++++++++++
 .../manager/pojo/node/hive/HiveDataNodeInfo.java   | 66 +++++++++++++++
 .../pojo/node/hive/HiveDataNodeRequest.java        | 61 ++++++++++++++
 .../service/node/AbstractDataNodeOperator.java     | 78 ++++++++++++++++++
 .../manager/service/node/DataNodeOperator.java     | 65 +++++++++++++++
 .../service/node/DataNodeOperatorFactory.java      | 46 +++++++++++
 .../service/{core => node}/DataNodeService.java    |  8 +-
 .../{core/impl => node}/DataNodeServiceImpl.java   | 50 ++++++------
 .../service/node/hive/HiveDataNodeOperator.java    | 87 ++++++++++++++++++++
 .../manager/service/sink/AbstractSinkOperator.java | 21 ++++-
 .../manager/service/sink/StreamSinkOperator.java   |  7 ++
 .../service/sink/StreamSinkServiceImpl.java        | 33 +++-----
 .../service/core/impl/DataNodeServiceTest.java     | 29 ++++---
 .../manager/web/controller/DataNodeController.java | 10 +--
 .../web/controller/DataNodeControllerTest.java     | 36 +++++----
 21 files changed, 667 insertions(+), 134 deletions(-)

diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 203fd10ae..26910e33b 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -62,8 +62,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -965,9 +965,8 @@ class ClientFactoryTest {
                                         Response.success(1))
                                 ))
         );
-        DataNodeRequest request = new DataNodeRequest();
-        request.setName("test_node");
-        request.setType(DataNodeType.HIVE);
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
+        request.setName("test_hive_node");
         Integer nodeId = dataNodeClient.save(request);
         Assertions.assertEquals(1, nodeId);
     }
@@ -1007,9 +1006,8 @@ class ClientFactoryTest {
                         )
         );
 
-        DataNodeRequest request = new DataNodeRequest();
-        request.setName("test_node");
-        request.setToken(DataNodeType.HIVE);
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
+        request.setName("test_hive_node");
         PageInfo<DataNodeResponse> nodePageInfo = dataNodeClient.list(request);
         Assertions.assertEquals(JsonUtils.toJsonString(nodePageInfo.getList()), JsonUtils.toJsonString(nodeResponses));
     }
@@ -1025,10 +1023,9 @@ class ClientFactoryTest {
                         )
         );
 
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setId(1);
-        request.setName("test_node");
-        request.setType(DataNodeType.HIVE);
+        request.setName("test_hive_node");
         Boolean isUpdate = dataNodeClient.update(request);
         Assertions.assertTrue(isUpdate);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ce24b12ee..d6cc20776 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -60,6 +60,9 @@ public enum ErrorCodeEnum {
     CLUSTER_TYPE_NOT_SUPPORTED(1102, "Cluster type '%s' not supported"),
     CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
 
+    DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
+    DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
+
     STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"),
     STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"),
     STREAM_OPT_NOT_ALLOWED(1203,
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 13b6bbad8..09c72464a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -98,7 +98,7 @@ public class HttpUtils {
     /**
      * Send an HTTP request
      */
-    public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
+    public static <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
             HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
         if (log.isDebugEnabled()) {
             log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
@@ -112,17 +112,31 @@ public class HttpUtils {
         return response.getBody();
     }
 
-    public <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
+    /**
+     * Send GET request to the specified URL.
+     */
+    public static <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params,
+            HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
+        return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+    }
+
+    /**
+     * Send PUT request to the specified URL.
+     */
+    public static <T> T putRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
             ParameterizedTypeReference<T> typeReference) {
-        return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
+        return request(restTemplate, url, HttpMethod.PUT, params, header, typeReference);
     }
 
-    public <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, HttpHeaders header,
+    /**
+     * Send POST request to the specified URL.
+     */
+    public static <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
             ParameterizedTypeReference<T> typeReference) {
-        return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+        return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
     }
 
-    private String buildUrlWithQueryParam(String url, Map<String, Object> params) {
+    private static String buildUrlWithQueryParam(String url, Map<String, Object> params) {
         if (params == null) {
             return url;
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
index 3db00c792..d17569e14 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
@@ -17,46 +17,42 @@
 
 package org.apache.inlong.manager.pojo.node;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
 
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
+import java.util.Date;
 
 /**
- * Data node request
+ * Data node info
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Data node  request")
-public class DataNodeRequest {
+@ApiModel("Data node info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeInfo {
 
-    @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
-    @NotBlank(message = "node name cannot be blank")
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
-    @NotBlank(message = "node type cannot be blank")
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node url")
+    @ApiModelProperty(value = "Data node URL")
     private String url;
 
-    @ApiModelProperty(value = "Node username")
+    @ApiModelProperty("Data node username")
     private String username;
 
-    @ApiModelProperty(value = "Node token if needed")
+    @ApiModelProperty(value = "Data node token if needed")
     private String token;
 
     @ApiModelProperty(value = "Extended params")
@@ -65,11 +61,23 @@ public class DataNodeRequest {
     @ApiModelProperty(value = "Description of the data node")
     private String description;
 
-    @NotBlank(message = "inCharges cannot be blank")
-    @ApiModelProperty(value = "Name of responsible person, separated by commas", required = true)
+    @ApiModelProperty(value = "Name of in charges, separated by commas")
     private String inCharges;
 
+    @ApiModelProperty(value = "Name of in creator")
+    private String creator;
+
+    @ApiModelProperty(value = "Name of in modifier")
+    private String modifier;
+
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
+    public abstract DataNodeRequest genRequest();
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
index 6f3862776..496aa9c9c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
@@ -31,10 +31,10 @@ import org.apache.inlong.manager.pojo.common.PageRequest;
 @ApiModel("Data node paging query request")
 public class DataNodePageRequest extends PageRequest {
 
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
     @ApiModelProperty(value = "Keywords, name, url, etc.")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 3db00c792..54504e2dc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.pojo.node;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -32,31 +32,31 @@ import javax.validation.constraints.NotNull;
  * Data node request
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Data node  request")
-public class DataNodeRequest {
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeRequest {
 
     @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
     @NotBlank(message = "node name cannot be blank")
-    @ApiModelProperty(value = "Node name")
+    @ApiModelProperty(value = "Data node name")
     private String name;
 
     @NotBlank(message = "node type cannot be blank")
-    @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
     private String type;
 
-    @ApiModelProperty(value = "Node url")
+    @ApiModelProperty(value = "Data node URL")
     private String url;
 
-    @ApiModelProperty(value = "Node username")
+    @ApiModelProperty(value = "Data node username")
     private String username;
 
-    @ApiModelProperty(value = "Node token if needed")
+    @ApiModelProperty(value = "Data node token if needed")
     private String token;
 
     @ApiModelProperty(value = "Extended params")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
new file mode 100644
index 000000000..5922e61e4
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Hive data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Hive data node info")
+public class HiveDataNodeDTO {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class);
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) throws Exception {
+        return HiveDataNodeDTO.builder()
+                .jdbcUrl(request.getJdbcUrl())
+                .hiveVersion(request.getHiveVersion())
+                .hiveConfDir(request.getHiveConfDir())
+                .hdfsPath(request.getHdfsPath())
+                .warehouse(request.getWarehouse())
+                .hdfsUgi(request.getHdfsUgi())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+        } catch (Exception e) {
+            LOGGER.error("Failed to extract additional parameters for hive data node: ", e);
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
new file mode 100644
index 000000000..34b50ed06
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hive data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node info")
+public class HiveDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    public HiveDataNodeInfo() {
+        this.setType(DataNodeType.HIVE);
+    }
+
+    @Override
+    public HiveDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, HiveDataNodeRequest::new);
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
new file mode 100644
index 000000000..ab51ed666
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hive data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node request")
+public class HiveDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Version for Hive, such as: 3.2.1")
+    private String hiveVersion;
+
+    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+    private String hiveConfDir;
+
+    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+    private String hdfsPath;
+
+    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    private String warehouse;
+
+    @ApiModelProperty("User and group information for writing data to HDFS")
+    private String hdfsUgi;
+
+    public HiveDataNodeRequest() {
+        this.setType(DataNodeType.HIVE);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
new file mode 100644
index 000000000..d66ff1233
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operation of data node.
+ */
+public abstract class AbstractDataNodeOperator implements DataNodeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class);
+
+    @Autowired
+    protected DataNodeEntityMapper dataNodeEntityMapper;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Integer saveOpt(DataNodeRequest request, String operator) {
+        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        dataNodeEntityMapper.insert(entity);
+
+        return entity.getId();
+    }
+
+    /**
+     * Set the parameters of the target entity.
+     *
+     * @param request data node request
+     * @param targetEntity entity which will set the new parameters
+     */
+    protected abstract void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity);
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+    public void updateOpt(DataNodeRequest request, String operator) {
+        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setModifier(operator);
+        int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("data node has already updated with name={}, type={}, curVersion={}", request.getName(),
+                    request.getType(), request.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
new file mode 100644
index 000000000..ce579994e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Interface of the data node operator.
+ */
+public interface DataNodeOperator {
+
+    /**
+     * Determines whether the current instance matches the specified type.
+     */
+    Boolean accept(String dataNodeType);
+
+    /**
+     * Get the data node type.
+     *
+     * @return data node type string
+     */
+    String getDataNodeType();
+
+    /**
+     * Save the data node info.
+     *
+     * @param request request of the data node
+     * @param operator name of the operator
+     * @return data node id after saving
+     */
+    Integer saveOpt(DataNodeRequest request, String operator);
+
+    /**
+     * Get the data node info from the given entity.
+     *
+     * @param entity get field value from the entity
+     * @return cluster info after encapsulating
+     */
+    DataNodeInfo getFromEntity(DataNodeEntity entity);
+
+    /**
+     * Update the data node info.
+     *
+     * @param request request of update
+     * @param operator name of operator
+     */
+    void updateOpt(DataNodeRequest request, String operator);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
new file mode 100644
index 000000000..fba9d84d7
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link DataNodeOperator}.
+ */
+@Service
+public class DataNodeOperatorFactory {
+
+    @Autowired
+    private List<DataNodeOperator> dataNodeOperatorList;
+
+    /**
+     * Get a cluster operator instance via the given type
+     */
+    public DataNodeOperator getInstance(String type) {
+        return dataNodeOperatorList.stream()
+                .filter(inst -> inst.accept(type))
+                .findFirst()
+                .orElseThrow(() -> new BusinessException(
+                        String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type)));
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index 027fd1bf1..29422e81d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.node;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 
 /**
  * Data node service layer interface
@@ -42,7 +42,7 @@ public interface DataNodeService {
      * @param id node id
      * @return node info
      */
-    DataNodeResponse get(Integer id);
+    DataNodeInfo get(Integer id);
 
     /**
      * Paging query nodes according to conditions.
@@ -50,7 +50,7 @@ public interface DataNodeService {
      * @param request page request conditions
      * @return node list
      */
-    PageInfo<DataNodeResponse> list(DataNodePageRequest request);
+    PageInfo<DataNodeInfo> list(DataNodePageRequest request);
 
     /**
      * Update data node.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index e87edc293..893987663 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.node;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
@@ -24,14 +24,12 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
-import org.apache.inlong.manager.service.core.DataNodeService;
 import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +39,7 @@ import org.springframework.stereotype.Service;
 import java.sql.Connection;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Data node service layer implementation
@@ -52,6 +51,8 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Autowired
     private DataNodeEntityMapper dataNodeMapper;
+    @Autowired
+    private DataNodeOperatorFactory operatorFactory;
 
     @Override
     public Integer save(DataNodeRequest request, String operator) {
@@ -65,33 +66,38 @@ public class DataNodeServiceImpl implements DataNodeService {
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
-        DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
-        entity.setCreator(operator);
-        entity.setModifier(operator);
-        dataNodeMapper.insert(entity);
-
+        // according to the data type, save sink information
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+        int id = dataNodeOperator.saveOpt(request, operator);
         LOGGER.debug("success to save data node={}", request);
-        return entity.getId();
+        return id;
     }
 
     @Override
-    public DataNodeResponse get(Integer id) {
+    public DataNodeInfo get(Integer id) {
         DataNodeEntity entity = dataNodeMapper.selectById(id);
         if (entity == null) {
             LOGGER.error("data node not found by id={}", id);
             throw new BusinessException("data node not found");
         }
-        DataNodeResponse response = CommonBeanUtils.copyProperties(entity, DataNodeResponse::new);
+
+        String dataNodeType = entity.getType();
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+        DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
         LOGGER.debug("success to get data node info by id={}", id);
-        return response;
+        return dataNodeInfo;
     }
 
     @Override
-    public PageInfo<DataNodeResponse> list(DataNodePageRequest request) {
+    public PageInfo<DataNodeInfo> list(DataNodePageRequest request) {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<DataNodeEntity> entityPage = (Page<DataNodeEntity>) dataNodeMapper.selectByCondition(request);
-        List<DataNodeResponse> responseList = CommonBeanUtils.copyListProperties(entityPage, DataNodeResponse::new);
-        PageInfo<DataNodeResponse> page = new PageInfo<>(responseList);
+        List<DataNodeInfo> list = entityPage.stream()
+                .map(entity -> {
+                    DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
+                    return dataNodeOperator.getFromEntity(entity);
+                }).collect(Collectors.toList());
+        PageInfo<DataNodeInfo> page = new PageInfo<>(list);
         page.setTotal(entityPage.getTotal());
         LOGGER.debug("success to list data node by {}", request);
         return page;
@@ -101,9 +107,8 @@ public class DataNodeServiceImpl implements DataNodeService {
     public Boolean update(DataNodeRequest request, String operator) {
         String name = request.getName();
         String type = request.getType();
-
-        Integer id = request.getId();
         DataNodeEntity exist = dataNodeMapper.selectByNameAndType(name, type);
+        Integer id = request.getId();
         if (exist != null && !Objects.equals(id, exist.getId())) {
             String errMsg = String.format("data node already exist for name=%s type=%s", name, type);
             LOGGER.error(errMsg);
@@ -121,13 +126,8 @@ public class DataNodeServiceImpl implements DataNodeService {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-        CommonBeanUtils.copyProperties(request, entity, true);
-        entity.setModifier(operator);
-        int rowCount = dataNodeMapper.updateById(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+        dataNodeOperator.updateOpt(request, operator);
         LOGGER.info("success to update data node={}", request);
         return true;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
new file mode 100644
index 000000000..7cb25bd5f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HiveDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.HIVE;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        HiveDataNodeInfo hiveDataNodeInfo = new HiveDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, hiveDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            HiveDataNodeDTO dto = HiveDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, hiveDataNodeInfo);
+        }
+
+        LOGGER.debug("success to get hive data node from entity");
+        return hiveDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
+        HiveDataNodeRequest hiveDataNodeRequest = (HiveDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(hiveDataNodeRequest, targetEntity, true);
+        try {
+            HiveDataNodeDTO dto = HiveDataNodeDTO.getFromRequest(hiveDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.debug("success to set entity for hive data node");
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for hive data node: ", e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index a771b2d32..2e790fff1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -26,15 +26,15 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sink.SinkField;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -186,6 +186,21 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         LOGGER.info("success to save sink fields");
     }
 
+    @Override
+    public void deleteOpt(StreamSinkEntity entity, String operator) {
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(InlongConstants.DELETED_STATUS);
+        entity.setIsDeleted(entity.getId());
+        entity.setModifier(operator);
+        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
+                    entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+        sinkFieldMapper.logicDeleteAll(entity.getId());
+    }
+
     /**
      * Check the validity of sink fields.
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 69e3b6c1c..e8231c419 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -89,4 +89,11 @@ public interface StreamSinkOperator {
      */
     void updateFieldOpt(Boolean onlyAdd, SinkRequest request);
 
+    /**
+     * Delete the sink info.
+     *
+     * @param entity sink info needs to delete
+     * @param operator name of the operator
+     */
+    void deleteOpt(StreamSinkEntity entity, String operator);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 311fd9329..f5860fb9e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -55,7 +55,6 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -106,13 +105,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
 
         // According to the sink type, save sink information
-        StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
         List<SinkField> fields = request.getSinkFieldList();
         // Remove id in sinkField when save
         if (CollectionUtils.isNotEmpty(fields)) {
             fields.forEach(sinkField -> sinkField.setId(null));
         }
-        int id = operation.saveOpt(request, operator);
+        int id = sinkOperator.saveOpt(request, operator);
 
         LOGGER.info("success to save sink info: {}", request);
         return id;
@@ -126,8 +125,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             LOGGER.error("sink not found by id={}", id);
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
         }
-        StreamSinkOperator operation = operatorFactory.getInstance(entity.getSinkType());
-        StreamSink streamSink = operation.getFromEntity(entity);
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+        StreamSink streamSink = sinkOperator.getFromEntity(entity);
         LOGGER.debug("success to get sink info by id={}", id);
         return streamSink;
     }
@@ -191,8 +190,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
         List<StreamSink> responseList = Lists.newArrayList();
         for (Map.Entry<String, Page<StreamSinkEntity>> entry : sinkMap.entrySet()) {
-            StreamSinkOperator operation = operatorFactory.getInstance(entry.getKey());
-            PageInfo<? extends StreamSink> pageInfo = operation.getPageInfo(entry.getValue());
+            StreamSinkOperator sinkOperator = operatorFactory.getInstance(entry.getKey());
+            PageInfo<? extends StreamSink> pageInfo = sinkOperator.getPageInfo(entry.getValue());
             responseList.addAll(pageInfo.getList());
         }
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
@@ -230,8 +229,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             fields.forEach(sinkField -> sinkField.setId(null));
         }
 
-        StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
-        operation.updateOpt(request, operator);
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
+        sinkOperator.updateOpt(request, operator);
 
         // The inlong group status is [Configuration successful], then asynchronously initiate
         // the [Single inlong stream resource creation] workflow
@@ -266,20 +265,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
-
-        entity.setPreviousStatus(entity.getStatus());
-        entity.setStatus(InlongConstants.DELETED_STATUS);
-        entity.setIsDeleted(id);
-        entity.setModifier(operator);
-        entity.setModifyTime(new Date());
-        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
-                    entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        sinkFieldMapper.logicDeleteAll(id);
-
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+        sinkOperator.deleteOpt(entity, operator);
         LOGGER.info("success to delete sink info: {}", entity);
         return true;
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
index a35aef416..41936336b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -39,7 +40,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
      * Save data node info.
      */
     public Integer saveOpt(String nodeName, String type, String url, String username, String password) {
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setName(nodeName);
         request.setType(type);
         request.setUrl(url);
@@ -47,13 +48,15 @@ public class DataNodeServiceTest extends ServiceBaseTest {
         request.setToken(password);
         request.setDescription("test cluster");
         request.setInCharges(GLOBAL_OPERATOR);
+        request.setJdbcUrl("127.0.0.1");
+        request.setToken("123456");
         return dataNodeService.save(request, GLOBAL_OPERATOR);
     }
 
     /**
      * Get data node list info.
      */
-    public PageInfo<DataNodeResponse> listOpt(String type, String name) {
+    public PageInfo<DataNodeInfo> listOpt(String type, String name) {
         DataNodePageRequest request = new DataNodePageRequest();
         request.setType(type);
         request.setName(name);
@@ -65,7 +68,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
      */
     public Boolean updateOpt(Integer id, String nodeName, String type, String url, String username, String password,
             Integer version) {
-        DataNodeRequest request = new DataNodeRequest();
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
         request.setId(id);
         request.setName(nodeName);
         request.setType(type);
@@ -86,7 +89,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
     @Test
     public void testDataService() {
         String nodeName = "hiveNode1";
-        String type = "HIVE";
+        String type = DataNodeType.HIVE;
         String url = "127.0.0.1:8080";
         String usename = "admin";
         String password = "123";
@@ -96,17 +99,17 @@ public class DataNodeServiceTest extends ServiceBaseTest {
         Assertions.assertNotNull(id);
 
         // test get data node
-        DataNodeResponse nodeResponse = dataNodeService.get(id);
-        Assertions.assertNotNull(nodeResponse);
-        Assertions.assertEquals(type, nodeResponse.getType());
+        DataNodeInfo dataNodeInfo = dataNodeService.get(id);
+        Assertions.assertNotNull(dataNodeInfo);
+        Assertions.assertEquals(type, dataNodeInfo.getType());
 
         // test get data node list
-        PageInfo<DataNodeResponse> listDataNode = this.listOpt(type, nodeName);
+        PageInfo<DataNodeInfo> listDataNode = this.listOpt(type, nodeName);
         Assertions.assertEquals(listDataNode.getTotal(), 1);
 
         // test update data node
-        String newNodeName = "kafkaNode1";
-        String newType = "KAFKA";
+        String newNodeName = "hiveNode2";
+        String newType = DataNodeType.HIVE;
         String newUrl = "127.0.0.1:8083";
         String newUsername = "admin2";
         String newPassword = "456";
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
index a8c8a81bd..802181c00 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
@@ -22,13 +22,13 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.user.LoginUserUtils;
 import org.apache.shiro.authz.annotation.RequiresRoles;
@@ -65,13 +65,13 @@ public class DataNodeController {
     @GetMapping(value = "/node/get/{id}")
     @ApiOperation(value = "Get node by id")
     @ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
-    public Response<DataNodeResponse> get(@PathVariable Integer id) {
+    public Response<DataNodeInfo> get(@PathVariable Integer id) {
         return Response.success(dataNodeService.get(id));
     }
 
     @PostMapping(value = "/node/list")
     @ApiOperation(value = "List data node")
-    public Response<PageInfo<DataNodeResponse>> list(@RequestBody DataNodePageRequest request) {
+    public Response<PageInfo<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) {
         return Response.success(dataNodeService.list(request));
     }
 
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
index 99223394f..ba7b10353 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.web.controller;
 
-import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.web.WebBaseTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -36,15 +38,15 @@ class DataNodeControllerTest extends WebBaseTest {
     @Resource
     DataNodeEntityMapper dataNodeEntityMapper;
 
-    DataNodeRequest getDataNodeRequest() {
-        return DataNodeRequest.builder()
-                .name("hiveNode1")
-                .type("HIVE")
-                .url("127.0.0.1:8080")
-                .username("admin")
-                .token("123")
-                .inCharges("admin")
-                .build();
+    HiveDataNodeRequest getHiveDataNodeRequest() {
+        HiveDataNodeRequest hiveDataNodeRequest = new HiveDataNodeRequest();
+        hiveDataNodeRequest.setName("hiveNode1");
+        hiveDataNodeRequest.setType(DataNodeType.HIVE);
+        hiveDataNodeRequest.setUrl("127.0.0.1:8080");
+        hiveDataNodeRequest.setUsername("admin");
+        hiveDataNodeRequest.setToken("123");
+        hiveDataNodeRequest.setInCharges("admin");
+        return hiveDataNodeRequest;
     }
 
     @Test
@@ -52,7 +54,7 @@ class DataNodeControllerTest extends WebBaseTest {
         logout();
         operatorLogin();
 
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
 
         Response<Integer> response = getResBody(mvcResult, Integer.class);
         Assertions.assertEquals("Current user [operator] has no permission to access URL", response.getErrMsg());
@@ -61,7 +63,7 @@ class DataNodeControllerTest extends WebBaseTest {
     @Test
     void testSaveAndGetAndDelete() throws Exception {
         // save
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
 
         Integer dataNodeId = getResBodyObj(mvcResult, Integer.class);
         Assertions.assertNotNull(dataNodeId);
@@ -71,7 +73,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
         DataNodeResponse dataNode = getResBodyObj(getResult, DataNodeResponse.class);
         Assertions.assertNotNull(dataNode);
-        Assertions.assertEquals(getDataNodeRequest().getName(), dataNode.getName());
+        Assertions.assertEquals(getHiveDataNodeRequest().getName(), dataNode.getName());
 
         // delete
         MvcResult deleteResult = deleteForSuccessMvcResult("/api/node/delete/{id}", dataNodeId);
@@ -99,7 +101,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
         dataNodeEntityMapper.insert(nodeEntity);
 
-        DataNodeRequest request = getDataNodeRequest();
+        DataNodeRequest request = getHiveDataNodeRequest();
         request.setId(nodeEntity.getId());
         request.setName("test447777");
         request.setVersion(nodeEntity.getVersion());
@@ -114,7 +116,7 @@ class DataNodeControllerTest extends WebBaseTest {
 
     @Test
     void testUpdateFailByNoId() throws Exception {
-        MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getDataNodeRequest());
+        MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getHiveDataNodeRequest());
 
         Response<Boolean> response = getResBody(mvcResult, Boolean.class);
         Assertions.assertFalse(response.isSuccess());