You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/31 06:11:16 UTC

[inlong] branch master updated: [INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d8fce48 [INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)
48d8fce48 is described below

commit 48d8fce48fc4530f775febb8e93a796e044898c5
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Oct 31 14:11:11 2022 +0800

    [INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)
---
 .../manager/dao/mapper/DataNodeEntityMapper.java   |   6 +
 .../manager/dao/mapper/StreamSinkEntityMapper.java |   9 +-
 .../dao/mapper/StreamSinkFieldEntityMapper.java    |   7 +
 .../resources/mappers/DataNodeEntityMapper.xml     |   7 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |  13 +-
 .../mappers/StreamSinkFieldEntityMapper.xml        |  10 +-
 .../pojo/sort/standalone/SortFieldInfo.java        |  27 ++++
 .../manager/service/core/SortConfigLoader.java     |  28 ++++
 .../service/core/impl/SortClusterServiceImpl.java  | 176 +++++++++------------
 .../service/core/impl/SortConfigLoaderImpl.java    |  46 ++++++
 .../service/core/impl/SortSourceServiceImpl.java   |   7 +
 .../service/node/AbstractDataNodeOperator.java     |  10 ++
 .../manager/service/node/DataNodeOperator.java     |   9 ++
 .../manager/service/sink/AbstractSinkOperator.java |  26 ++-
 .../manager/service/sink/StreamSinkOperator.java   |   9 ++
 .../manager/service/sort/SortServiceImplTest.java  |  72 +++++----
 16 files changed, 312 insertions(+), 150 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index ac17f283e..e246f4931 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -17,7 +17,10 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.ibatis.annotations.Options;
 import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSinkInfo;
@@ -36,6 +39,9 @@ public interface DataNodeEntityMapper {
 
     List<DataNodeEntity> selectByCondition(DataNodePageRequest request);
 
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+    Cursor<DataNodeEntity> selectAllDataNodes();
+
     List<SortSinkInfo> selectAllSinkParams();
 
     int updateById(DataNodeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 20fbd9e55..63dbf098d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -120,21 +120,24 @@ public interface StreamSinkEntityMapper {
      */
     List<SinkInfo> selectAllConfig(@Param("groupId") String groupId, @Param("idList") List<String> streamIdList);
 
-    List<StreamSinkEntity> selectAllStreamSinks();
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+    Cursor<StreamSinkEntity> selectAllStreamSinks();
 
     /**
      * Select all tasks for sort-standalone
      *
      * @return All tasks
      */
-    List<SortTaskInfo> selectAllTasks();
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+    Cursor<SortTaskInfo> selectAllTasks();
 
     /**
      * Select all id params for sort-standalone
      *
      * @return All id params
      */
-    List<SortIdInfo> selectAllIdParams();
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+    Cursor<SortIdInfo> selectAllIdParams();
 
     /**
      * Select all streams for sort sdk.
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
index b487deda7..fbba1fce0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
@@ -17,8 +17,12 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.ibatis.annotations.Options;
 import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.springframework.stereotype.Repository;
 
 import java.util.List;
@@ -34,6 +38,9 @@ public interface StreamSinkFieldEntityMapper {
 
     List<StreamSinkFieldEntity> selectFields(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
+    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+    Cursor<SortFieldInfo> selectAllFields();
+
     /**
      * According to the sink id, query the sink field.
      *
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 90ceccaf3..3cfcbf398 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -100,7 +100,12 @@
         from data_node
         where is_deleted = 0
     </select>
-
+    <select id="selectAllDataNodes" resultType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from data_node
+        where is_deleted = 0
+    </select>
     <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
         update data_node
         set name        = #{name, jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index aac142d59..f6422643c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -350,12 +350,6 @@
             </if>
         </where>
     </select>
-    <select id="selectAllStreamSinks" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from stream_sink
-        where is_deleted = 0
-    </select>
     <select id="selectAllTasks" resultType="org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo">
         select inlong_cluster_name as sortClusterName,
                sort_task_name,
@@ -382,7 +376,12 @@
         from stream_sink
         where is_deleted = 0
     </select>
-
+    <select id="selectAllStreamSinks" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_sink
+        where is_deleted = 0
+    </select>
     <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         update stream_sink
         <set>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 8c85171ff..8273c3334 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -119,7 +119,15 @@
           and field.is_deleted = 0
           and sink.is_deleted = 0
     </select>
-
+    <select id="selectAllFields" resultType="org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo">
+        select
+            inlong_group_id,
+            inlong_stream_id,
+            field_name
+        from stream_sink_field
+        where is_deleted = 0
+        order by id asc
+    </select>
     <update id="logicDeleteAll">
         update stream_sink_field
         set is_deleted = id
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
new file mode 100644
index 000000000..f686ed065
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.standalone;
+
+import lombok.Data;
+
+@Data
+public class SortFieldInfo {
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String fieldName;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
index 7bfa2a8ec..6b0a02fc5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
@@ -17,12 +17,16 @@
 
 package org.apache.inlong.manager.service.core;
 
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
 
 import java.util.List;
 
@@ -67,4 +71,28 @@ public interface SortConfigLoader {
      * @return List of stream info
      */
     List<SortSourceStreamInfo> loadAllStreams();
+
+    /**
+     * Load all inlong stream sink entity by cursor
+     * @return List of stream sink entity
+     */
+    List<StreamSinkEntity> loadAllStreamSinkEntity();
+
+    /**
+     * Load all task info
+     * @return List of tasks
+     */
+    List<SortTaskInfo> loadAllTask();
+
+    /**
+     * Load all data node entity
+     * @return List of data node
+     */
+    List<DataNodeEntity> loadAllDataNodeEntity();
+
+    /**
+     * Load all fields info
+     * @return List of fields info
+     */
+    List<SortFieldInfo> loadAllFields();
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index b9584cf96..e55aa3ea8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -23,12 +23,17 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.pojo.sort.standalone.SortIdInfo;
-import org.apache.inlong.manager.pojo.sort.standalone.SortSinkInfo;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
-import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.service.core.SortClusterService;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.node.DataNodeOperator;
+import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
+import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
+import org.apache.inlong.manager.service.sink.StreamSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -37,6 +42,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +73,7 @@ public class SortClusterServiceImpl implements SortClusterService {
 
     private static final String KEY_GROUP_ID = "inlongGroupId";
     private static final String KEY_STREAM_ID = "inlongStreamId";
+    private Map<String, List<String>> fieldMap;
 
     // key : sort cluster name, value : md5
     private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
@@ -78,9 +85,11 @@ public class SortClusterServiceImpl implements SortClusterService {
     private long reloadInterval;
 
     @Autowired
-    private StreamSinkEntityMapper streamSinkEntityMapper;
+    private SortConfigLoader sortConfigLoader;
     @Autowired
-    private DataNodeEntityMapper dataNodeEntityMapper;
+    private SinkOperatorFactory sinkOperatorFactory;
+    @Autowired
+    private DataNodeOperatorFactory dataNodeOperatorFactory;
 
     @PostConstruct
     public void initialize() {
@@ -98,7 +107,7 @@ public class SortClusterServiceImpl implements SortClusterService {
     public void reload() {
         LOGGER.debug("start to reload sort config");
         try {
-            reloadAllClusterConfig();
+            reloadAllClusterConfigV2();
         } catch (Throwable t) {
             LOGGER.error(t.getMessage(), t);
         }
@@ -152,78 +161,83 @@ public class SortClusterServiceImpl implements SortClusterService {
                 .build();
     }
 
-    /**
-     * Reload all cluster config.
-     * The results including config, md5 and error log, will replace the older ones.
-     */
-    private void reloadAllClusterConfig() {
-        // get all task and group by cluster
-        List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
+    private void reloadAllClusterConfigV2() {
+        // load all fields info
+        List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
+        fieldMap = new HashMap<>();
+        fieldInfos.forEach(info -> {
+            List<String> fields = fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>());
+            fields.add(info.getFieldName());
+        });
+
+        List<StreamSinkEntity> sinkEntities = sortConfigLoader.loadAllStreamSinkEntity();
+        // get all task under a given cluster, has been reduced into cluster and task.
+        List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
         Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
                 .filter(dto -> dto.getSortClusterName() != null)
                 .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
 
-        // get all id params and group by task
-        List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
-        Map<String, List<SortIdInfo>> taskIdParamMap = idParams.stream()
-                .filter(dto -> dto.getSortTaskName() != null)
-                .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
-
-        // get all sink params and group by data node name
-        List<SortSinkInfo> sinkParams = dataNodeEntityMapper.selectAllSinkParams();
-        Map<String, SortSinkInfo> taskSinkParamMap = sinkParams.stream()
-                .filter(dto -> dto.getName() != null)
-                .collect(Collectors.toMap(SortSinkInfo::getName, param -> param));
+        // get all stream sinks
+        Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream()
+                .filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName()))
+                .collect(Collectors.groupingBy(StreamSinkEntity::getSinkName));
+
+        // get all data nodes and group by node name
+        List<DataNodeEntity> dataNodeEntities = sortConfigLoader.loadAllDataNodeEntity();
+        Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream()
+                .filter(entity -> StringUtils.isNotBlank(entity.getName()))
+                .map(entity -> {
+                    DataNodeOperator operator = dataNodeOperatorFactory.getInstance(entity.getType());
+                    return operator.getFromEntity(entity);
+                })
+                .collect(Collectors.toMap(DataNodeInfo::getName, info -> info));
 
-        // update config of each cluster
+        // re-org all SortClusterConfigs
         Map<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<>();
         Map<String, String> newMd5Map = new ConcurrentHashMap<>();
         Map<String, String> newErrorLogMap = new ConcurrentHashMap<>();
+
         clusterTaskMap.forEach((clusterName, taskList) -> {
             try {
-                // get config, then update config map and md5
-                SortClusterConfig clusterConfig = getConfigByClusterName(clusterName, taskList, taskIdParamMap,
-                        taskSinkParamMap);
-                String jsonStr = GSON.toJson(clusterConfig);
+                SortClusterConfig config = this.getConfigByClusterNameV2(clusterName,
+                        taskList, task2AllStreams, task2DataNodeMap);
+                String jsonStr = GSON.toJson(config);
                 String md5 = DigestUtils.md5Hex(jsonStr);
-                newConfigMap.put(clusterName, clusterConfig);
+                newConfigMap.put(clusterName, config);
                 newMd5Map.put(clusterName, md5);
             } catch (Throwable e) {
                 // if get config failed, update the err log.
                 newErrorLogMap.put(clusterName, e.getMessage());
-                LOGGER.error("Failed to update cluster config of {}, error is {}", clusterName, e.getMessage());
-                LOGGER.error(e.getMessage(), e);
+                LOGGER.error("Failed to update cluster config={}, error={}", clusterName, e.getMessage());
             }
         });
+
         sortClusterErrorLogMap = newErrorLogMap;
         sortClusterConfigMap = newConfigMap;
         sortClusterMd5Map = newMd5Map;
     }
 
-    /**
-     * Get the latest config of specific cluster.
-     *
-     * @param clusterName Cluster name.
-     * @param tasks Task in this cluster.
-     * @param taskIdParamMap All id params.
-     * @param taskSinkParamMap All sink params.
-     * @return The sort cluster config of specific cluster.
-     */
-    private SortClusterConfig getConfigByClusterName(
+    private SortClusterConfig getConfigByClusterNameV2(
             String clusterName,
             List<SortTaskInfo> tasks,
-            Map<String, List<SortIdInfo>> taskIdParamMap,
-            Map<String, SortSinkInfo> taskSinkParamMap) {
+            Map<String, List<StreamSinkEntity>> task2AllStreams,
+            Map<String, DataNodeInfo> task2DataNodeMap) {
 
         List<SortTaskConfig> taskConfigs = tasks.stream()
                 .map(task -> {
                     String taskName = task.getSortTaskName();
                     String type = task.getSinkType();
-                    List<SortIdInfo> idParams = taskIdParamMap.get(taskName);
-                    SortSinkInfo sinkParams = taskSinkParamMap.get(task.getDataNodeName());
-                    return this.getTaskConfig(taskName, type, idParams, sinkParams);
+                    String dataNodeName = task.getDataNodeName();
+                    DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName);
+                    List<StreamSinkEntity> streams = task2AllStreams.get(taskName);
+
+                    return SortTaskConfig.builder()
+                            .name(taskName)
+                            .type(type)
+                            .idParams(this.parseIdParamsV2(streams))
+                            .sinkParams(this.parseSinkParamsV2(nodeInfo))
+                            .build();
                 })
-                .filter(Objects::nonNull)
                 .collect(Collectors.toList());
 
         return SortClusterConfig.builder()
@@ -232,64 +246,20 @@ public class SortClusterServiceImpl implements SortClusterService {
                 .build();
     }
 
-    /**
-     * Get task config.
-     * <p/>
-     * If there is not any id or sink params, throw exception to upper caller.
-     *
-     * @param taskName Task name.
-     * @param type Type of sink.
-     * @param idParams Id params.
-     * @param sinkParams Sink params.
-     * @return Task config.
-     */
-    private SortTaskConfig getTaskConfig(String taskName, String type, List<SortIdInfo> idParams,
-            SortSinkInfo sinkParams) {
-        // return null if id params or sink params are empty.
-        if (idParams == null || sinkParams == null) {
-            return null;
-        }
-
-        if (!type.equalsIgnoreCase(sinkParams.getType())) {
-            throw new IllegalArgumentException(
-                    String.format("task type %s and sink type %s are not identical for task name %s",
-                            type, sinkParams.getType(), taskName));
-        }
-
-        return SortTaskConfig.builder()
-                .name(taskName)
-                .type(type)
-                .idParams(this.parseIdParams(idParams))
-                .sinkParams(this.parseSinkParams(sinkParams))
-                .build();
-    }
-
-    /**
-     * Parse id params from json.
-     *
-     * @param rowIdParams IdParams in json format.
-     * @return List of IdParams.
-     */
-    private List<Map<String, String>> parseIdParams(List<SortIdInfo> rowIdParams) {
-        return rowIdParams.stream()
-                .map(row -> {
-                    Map<String, String> param = GSON.fromJson(row.getExtParams(), HashMap.class);
-                    // put group and stream info
-                    param.put(KEY_GROUP_ID, row.getInlongGroupId());
-                    param.put(KEY_STREAM_ID, row.getInlongStreamId());
-                    return param;
+    private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity> streams) {
+        return streams.stream()
+                .map(streamSink -> {
+                    StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType());
+                    List<String> fields = fieldMap.get(streamSink.getInlongGroupId());
+                    return operator.parse2IdParams(streamSink, fields);
                 })
+                .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
 
-    /**
-     * Parse sink params from json.
-     *
-     * @param rowSinkParams Sink params in json format.
-     * @return Sink params.
-     */
-    private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
-        return GSON.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+    private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) {
+        DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType());
+        return operator.parse2SinkParams(nodeInfo);
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
index f442f9a46..100c53d4f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
@@ -18,18 +18,24 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.ibatis.cursor.Cursor;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
 import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -45,6 +51,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
     @Autowired
     private StreamSinkEntityMapper streamSinkEntityMapper;
     @Autowired
+    private StreamSinkFieldEntityMapper streamSinkFieldEntityMapper;
+    @Autowired
     private InlongGroupEntityMapper inlongGroupEntityMapper;
     @Autowired
     private InlongGroupExtEntityMapper inlongGroupExtEntityMapper;
@@ -52,6 +60,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
     private InlongStreamExtEntityMapper inlongStreamExtEntityMapper;
     @Autowired
     private InlongStreamEntityMapper inlongStreamEntityMapper;
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
 
     @Transactional
     @Override
@@ -106,4 +116,40 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
         cursor.forEach(allStreams::add);
         return allStreams;
     }
+
+    @Transactional
+    @Override
+    public List<StreamSinkEntity> loadAllStreamSinkEntity() {
+        Cursor<StreamSinkEntity> cursor = streamSinkEntityMapper.selectAllStreamSinks();
+        List<StreamSinkEntity> allStreamSinks = new ArrayList<>();
+        cursor.forEach(allStreamSinks::add);
+        return allStreamSinks;
+    }
+
+    @Transactional
+    @Override
+    public List<SortTaskInfo> loadAllTask() {
+        Cursor<SortTaskInfo> cursor = streamSinkEntityMapper.selectAllTasks();
+        List<SortTaskInfo> allTasks = new ArrayList<>();
+        cursor.forEach(allTasks::add);
+        return allTasks;
+    }
+
+    @Transactional
+    @Override
+    public List<DataNodeEntity> loadAllDataNodeEntity() {
+        Cursor<DataNodeEntity> cursor = dataNodeEntityMapper.selectAllDataNodes();
+        List<DataNodeEntity> allDataNodes = new ArrayList<>();
+        cursor.forEach(allDataNodes::add);
+        return allDataNodes;
+    }
+
+    @Transactional
+    @Override
+    public List<SortFieldInfo> loadAllFields() {
+        Cursor<SortFieldInfo> cursor = streamSinkFieldEntityMapper.selectAllFields();
+        List<SortFieldInfo> allFields = new ArrayList<>();
+        cursor.forEach(allFields::add);
+        return allFields;
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index cadf713c5..9059a1b3e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -268,6 +268,13 @@ public class SortSourceServiceImpl implements SortSourceService {
         });
         sortSourceConfigMap = newConfigMap;
         sortSourceMd5Map = newMd5Map;
+        mqClusters = null;
+        groupInfos = null;
+        allStreams = null;
+        backupClusterTag = null;
+        backupGroupMqResource = null;
+        backupStreamMqResource = null;
+        groupMap = null;
     }
 
     private Map<String, CacheZone> parseCacheZones(
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index 0ccea9162..51eeb91f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -21,8 +21,10 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +32,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Default operation of data node.
  */
@@ -75,4 +80,9 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
     }
+
+    @Override
+    public Map<String, String> parse2SinkParams(DataNodeInfo info) {
+        return JsonUtils.parseObject(info.getExtParams(), HashMap.class);
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index ce579994e..aeae290b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -21,6 +21,8 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 
+import java.util.Map;
+
 /**
  * Interface of the data node operator.
  */
@@ -62,4 +64,11 @@ public interface DataNodeOperator {
      * @param operator name of operator
      */
     void updateOpt(DataNodeRequest request, String operator);
+
+    /**
+     * Parse data node info to sort-standalone sink params
+     * @param info DataNodeInfo
+     * @return Sink params
+     */
+    Map<String, String> parse2SinkParams(DataNodeInfo info);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index db6cd23aa..35f0b6b5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.service.sink;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -26,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
@@ -52,8 +54,9 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSinkOperator.class);
 
-    @Autowired
-    protected ObjectMapper objectMapper;
+    protected static final String KEY_GROUP_ID = "inlongGroupId";
+    protected static final String KEY_STREAM_ID = "inlongStreamId";
+
     @Autowired
     protected StreamSinkEntityMapper sinkMapper;
     @Autowired
@@ -213,6 +216,23 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         sinkFieldMapper.logicDeleteAll(entity.getId());
     }
 
+    @Override
+    public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields) {
+        Map<String, String> param;
+        try {
+            param = JsonUtils.parseObject(streamSink.getExtParams(), HashMap.class);
+        } catch (Exception e) {
+            LOGGER.error("cannot parse properties of groupId={}, streamId={}, sinkName={}, the row properties is={}, "
+                            + "exception={}", streamSink.getInlongGroupId(), streamSink.getInlongStreamId(),
+                    streamSink.getSinkName(), streamSink.getExtParams(), e.getMessage());
+            return null;
+        }
+        // put group and stream info
+        param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
+        param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
+        return param;
+    }
+
     /**
      * Check the validity of sink fields.
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 71ed90842..a2d933595 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Interface of the sink operator
@@ -98,4 +99,12 @@ public interface StreamSinkOperator {
      * @param operator name of the operator
      */
     void deleteOpt(StreamSinkEntity entity, String operator);
+
+    /**
+     * Parse stream sink to id params
+     *
+     * @param streamSink
+     * @return
+     */
+    Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields);
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index f8eb9eff8..338ee3276 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.sort;
 import org.apache.inlong.common.constant.ClusterSwitch;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
@@ -34,11 +34,16 @@ import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.SortService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.node.DataNodeService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.json.JSONObject;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -52,6 +57,8 @@ import org.springframework.transaction.annotation.Transactional;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Sort service test for {@link SortService}
@@ -85,6 +92,10 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private InlongGroupService groupService;
     @Autowired
     private InlongStreamService streamService;
+    @Autowired
+    private DataNodeService dataNodeService;
+    @Autowired
+    private StreamSinkService streamSinkService;
 
     @Test
     @Order(1)
@@ -201,18 +212,18 @@ public class SortServiceImplTest extends ServiceBaseTest {
     }
 
     private void prepareDataNode(String taskName) {
-        DataNodeEntity entity = new DataNodeEntity();
-        entity.setName(taskName);
-        entity.setType(TEST_SINK_TYPE);
-        entity.setExtParams("{\"paramKey1\":\"paramValue1\"}");
-        entity.setCreator(TEST_CREATOR);
-        entity.setInCharges(TEST_CREATOR);
-        Date now = new Date();
-        entity.setCreateTime(now);
-        entity.setModifyTime(now);
-        entity.setIsDeleted(InlongConstants.UN_DELETED);
-        entity.setVersion(InlongConstants.INITIAL_VERSION);
-        dataNodeEntityMapper.insert(entity);
+        HiveDataNodeRequest request = new HiveDataNodeRequest();
+        request.setUrl("test_hive_url");
+        request.setName(taskName);
+        request.setExtParams("{\"paramKey1\":\"paramValue1\",\"hdfsUgi\":\"test_hdfsUgi\"}");
+        request.setHdfsPath("testPath");
+        request.setHiveConfDir("testDir");
+        request.setWarehouse("testWareHouse");
+        request.setHdfsUgi("testUgi");
+        request.setInCharges(TEST_CREATOR);
+        request.setUsername("test_hive_user");
+        request.setToken("test_hive_token");
+        dataNodeService.save(request, TEST_CREATOR);
     }
 
     private void prepareGroupId(String groupId) {
@@ -265,7 +276,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private void prepareCluster(String clusterName) {
         InlongClusterEntity entity = new InlongClusterEntity();
         entity.setName(clusterName);
-        entity.setType(TEST_SINK_TYPE);
+        entity.setType(DataNodeType.HIVE);
         entity.setExtParams("{}");
         entity.setCreator(TEST_CREATOR);
         entity.setInCharges(TEST_CREATOR);
@@ -295,22 +306,19 @@ public class SortServiceImplTest extends ServiceBaseTest {
     }
 
     private void prepareTask(String taskName, String groupId, String clusterName) {
-        StreamSinkEntity entity = new StreamSinkEntity();
-        entity.setInlongGroupId(groupId);
-        entity.setInlongStreamId("1");
-        entity.setSinkType(TEST_SINK_TYPE);
-        entity.setSinkName(taskName);
-        entity.setInlongClusterName(clusterName);
-        entity.setDataNodeName(taskName);
-        entity.setSortTaskName(taskName);
-        entity.setCreator(TEST_CREATOR);
-        Date now = new Date();
-        entity.setCreateTime(now);
-        entity.setModifyTime(now);
-        entity.setIsDeleted(InlongConstants.UN_DELETED);
-        entity.setVersion(InlongConstants.INITIAL_VERSION);
-        entity.setExtParams("{\"delimiter\":\"|\",\"dataType\":\"text\"}");
-        streamSinkEntityMapper.insert(entity);
+        SinkRequest request = new HiveSinkRequest();
+        request.setDataNodeName(taskName);
+        request.setSinkType(SinkType.HIVE);
+        request.setInlongClusterName(clusterName);
+        request.setSinkName(taskName);
+        request.setSortTaskName(taskName);
+        request.setInlongGroupId(groupId);
+        request.setInlongStreamId("1");
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("delimiter", "|");
+        properties.put("dataType", "text");
+        request.setProperties(properties);
+        streamSinkService.save(request, TEST_CREATOR);
     }
 
-}
\ No newline at end of file
+}