You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/06 09:13:42 UTC

[inlong] branch master updated: [INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 44d007c31 [INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)
44d007c31 is described below

commit 44d007c310aee60ca6ff8f5090457ab16b4fa783
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Jul 6 17:13:36 2022 +0800

    [INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)
---
 .../common/pojo/sortstandalone/SortIdInfo.java}    |  24 +-
 .../common/pojo/sortstandalone/SortSinkInfo.java}  |  24 +-
 .../common/pojo/sortstandalone/SortTaskInfo.java}  |  25 +-
 .../manager/dao/mapper/DataNodeEntityMapper.java   |   2 +
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  16 ++
 .../resources/mappers/DataNodeEntityMapper.xml     |  10 +
 .../resources/mappers/StreamSinkEntityMapper.xml   |  23 ++
 ...IdParamService.java => SortClusterService.java} |  16 +-
 .../core/impl/SortClusterConfigServiceImpl.java    |  41 ---
 .../service/core/impl/SortClusterServiceImpl.java  | 309 +++++++++++++++++++++
 .../manager/service/core/impl/SortServiceImpl.java |  88 +-----
 .../core/impl/SortTaskIdParamServiceImpl.java      |  57 ----
 .../core/impl/SortTaskSinkParamServiceImpl.java    |  52 ----
 .../web/controller/openapi/SortControllerTest.java |  30 +-
 14 files changed, 400 insertions(+), 317 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
similarity index 65%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
index 9cb601c83..2cbb26593 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
@@ -15,21 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
 
-import java.util.Map;
+import lombok.Data;
 
-/**
- * Sort sink params service.
- */
-public interface SortTaskSinkParamService {
-
-    /**
-     * Select all sink params by task name and sink type.
-     *
-     * @param taskName Name of task;
-     * @param sinkType Type of sink;
-     * @return map of sink params.
-     */
-    Map<String, String> selectByTaskNameAndType(String taskName, String sinkType);
+@Data
+public class SortIdInfo {
+    private static final long serialVersionUID = 1L;
+    String sortTaskName;
+    String inlongGroupId;
+    String inlongStreamId;
+    String extParams;
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
similarity index 62%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
index 88a783b50..38ce0f7c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
@@ -15,22 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
+import lombok.Data;
 
-import java.util.List;
-
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
-
-    /**
-     * Select list of task by cluster name.
-     *
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
-     */
-    List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
+@Data
+public class SortSinkInfo {
+    private static final long serialVersionUID = 1L;
+    String name;
+    String type;
+    String extParams;
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
similarity index 67%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
index 12e3e271f..19da29a2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
@@ -15,21 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
 
-import java.util.List;
-import java.util.Map;
+import lombok.Data;
 
-/**
- * Sort id params service.
- */
-public interface SortTaskIdParamService {
-
-    /**
-     * Select all id params by task name.
-     *
-     * @param taskName WorkflowTask name.
-     * @return List of all id params.
-     */
-    List<Map<String, String>> selectByTaskName(String taskName);
+@Data
+public class SortTaskInfo {
+    private static final long serialVersionUID = 1L;
+    String sortClusterName;
+    String sortTaskName;
+    String sortConsumerGroup;
+    String sinkType;
+    String dataNodeName;
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index 583a34125..70f05c7d0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo;
 import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.springframework.stereotype.Repository;
@@ -41,4 +42,5 @@ public interface DataNodeEntityMapper {
 
     int deleteById(Integer id);
 
+    List<SortSinkInfo> selectAllSinkParams();
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index b6f88b833..ed22bdad4 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
@@ -113,4 +115,18 @@ public interface StreamSinkEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
+    /**
+     * Select all tasks for sort-standalone
+     *
+     * @return All tasks
+     */
+    List<SortTaskInfo> selectAllTasks();
+
+    /**
+     * Select all id params for sort-standalone
+     *
+     * @return All id params
+     */
+    List<SortIdInfo> selectAllIdParams();
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 20109b878..2757aa0fe 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -163,4 +163,14 @@
         from data_node
         where id = #{id,jdbcType=INTEGER}
     </delete>
+
+    <select id="selectAllSinkParams" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo">
+        select name,
+               type,
+               ext_params
+        from data_node
+        <where>
+            is_deleted = 0
+        </where>
+    </select>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 3c9e6ce54..d387ee774 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -457,5 +457,28 @@
             modify_time     = now()
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <select id="selectAllTasks" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo">
+        select inlong_cluster_name as sortClusterName,
+               sort_task_name,
+               sort_consumer_group,
+               sink_type,
+               data_node_name
+        from stream_sink
+        <where>
+            is_deleted = 0
+        </where>
+        group by
+        inlong_cluster_name, sort_task_name, sort_consumer_group, sink_type, data_node_name
+    </select>
+    <select id="selectAllIdParams" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo">
+        select sort_task_name,
+               inlong_group_id,
+               inlong_stream_id,
+               ext_params
+        from stream_sink
+        <where>
+            is_deleted = 0
+        </where>
+    </select>
 
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
similarity index 69%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
index 12e3e271f..dd98398bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
@@ -17,19 +17,19 @@
 
 package org.apache.inlong.manager.service.core;
 
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 
 /**
- * Sort id params service.
+ * Sort cluster config interface.
  */
-public interface SortTaskIdParamService {
+public interface SortClusterService {
 
     /**
-     * Select all id params by task name.
+     * Get the cluster config response by specific cluster name.
      *
-     * @param taskName WorkflowTask name.
-     * @return List of all id params.
+     * @param clusterName Cluster name.
+     * @param md5 Last md5.
+     * @return Corresponding response.
      */
-    List<Map<String, String>> selectByTaskName(String taskName);
+    SortClusterResponse getClusterConfig(String clusterName, String md5);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
deleted file mode 100644
index 51f9dc3ed..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
-import org.apache.inlong.manager.service.core.SortClusterConfigService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * Sort cluster config service implementation.
- */
-@Service
-public class SortClusterConfigServiceImpl implements SortClusterConfigService {
-
-    @Autowired
-    private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
-
-    @Override
-    public List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName) {
-        return sortClusterConfgiEntityMapper.selectTasksByClusterName(clusterName);
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
new file mode 100644
index 000000000..4b8bf2f04
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.google.gson.Gson;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.service.core.SortClusterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Use to cache the sort cluster config and reduce the number of query to database.
+ */
+@Service
+public class SortClusterServiceImpl implements SortClusterService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
+
+    private static final Gson gson = new Gson();
+
+    public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
+
+    private static final int RESPONSE_CODE_SUCCESS = 0;
+    private static final int RESPONSE_CODE_NO_UPDATE = 1;
+    private static final int RESPONSE_CODE_FAIL = -1;
+    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
+
+    private static final String KEY_GROUP_ID = "inlongGroupId";
+    private static final String KEY_STREAM_ID = "inlongStreamId";
+
+    // key : sort cluster name, value : md5
+    private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
+    // key : sort cluster name, value : cluster config
+    private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<>();
+    // key : sort cluster name, value : error log
+    private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<>();
+
+    private long reloadInterval;
+
+    @Autowired
+    private StreamSinkEntityMapper streamSinkEntityMapper;
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
+
+    @PostConstruct
+    public void initialize() {
+        LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
+        try {
+            this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
+            reload();
+            setReloadTimer();
+        } catch (Throwable t) {
+            LOGGER.error("Initialize SortClusterConfigRepository error", t);
+        }
+    }
+
+    @Transactional(rollbackFor = Exception.class)
+    public void reload() {
+        LOGGER.debug("start to reload sort config.");
+        try {
+            reloadAllClusterConfig();
+        } catch (Throwable t) {
+            LOGGER.error(t.getMessage(), t);
+        }
+        LOGGER.debug("end to reload config");
+    }
+
+    @Override
+    public SortClusterResponse getClusterConfig(String clusterName, String md5) {
+        // check if cluster name is valid or not.
+        if (StringUtils.isBlank(clusterName)) {
+            String errMsg = "Blank cluster name, return nothing";
+            LOGGER.info(errMsg);
+            return SortClusterResponse.builder()
+                    .msg(errMsg)
+                    .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+                    .build();
+        }
+
+        // if there is an error
+        if (sortClusterErrorLogMap.get(clusterName) != null) {
+            return SortClusterResponse.builder()
+                    .msg(sortClusterErrorLogMap.get(clusterName))
+                    .code(RESPONSE_CODE_FAIL)
+                    .build();
+        }
+
+        // there is no config
+        if (sortClusterConfigMap.get(clusterName) == null) {
+            String errMsg = "There is not config for cluster " + clusterName;
+            LOGGER.info(errMsg);
+            return SortClusterResponse.builder()
+                    .msg(errMsg)
+                    .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+                    .build();
+        }
+
+        // if the same md5
+        if (sortClusterMd5Map.get(clusterName).equals(md5)) {
+            return SortClusterResponse.builder()
+                    .msg("No update")
+                    .code(RESPONSE_CODE_NO_UPDATE)
+                    .md5(md5)
+                    .build();
+        }
+
+        return SortClusterResponse.builder()
+                .msg("Success")
+                .code(RESPONSE_CODE_SUCCESS)
+                .data(sortClusterConfigMap.get(clusterName))
+                .md5(sortClusterMd5Map.get(clusterName))
+                .build();
+    }
+
+    /**
+     * Reload all cluster config.
+     *
+     * <p>
+     *     The reload results, including config, md5 and error log, will replace the older ones.
+     * </p>
+     */
+    private void reloadAllClusterConfig() {
+        // get all task and group by cluster
+        List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
+        Map<String, List<SortTaskInfo>> clusterTaskMap =
+                tasks.stream()
+                        .filter(dto -> dto.getSortClusterName() != null)
+                        .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
+
+        // get all id params and group by task
+        List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
+        Map<String, List<SortIdInfo>> taskIdParamMap =
+                idParams.stream()
+                        .filter(dto -> dto.getSortTaskName() != null)
+                        .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
+
+        // get all sink params and group by data node name
+        List<SortSinkInfo> sinkParams = dataNodeEntityMapper.selectAllSinkParams();
+        Map<String, SortSinkInfo> taskSinkParamMap =
+                sinkParams.stream()
+                        .filter(dto -> dto.getName() != null)
+                        .collect(Collectors.toMap(SortSinkInfo::getName, param -> param));
+
+        // update config of each cluster
+        Map<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<>();
+        Map<String, String> newMd5Map = new ConcurrentHashMap<>();
+        Map<String, String> newErrorlogMap = new ConcurrentHashMap<>();
+        clusterTaskMap.forEach((clusterName, taskList) -> {
+            try {
+                // get config, then update config map and md5
+                SortClusterConfig clusterConfig =
+                        getConfigByClusterName(clusterName, taskList, taskIdParamMap, taskSinkParamMap);
+                String jsonStr = gson.toJson(clusterConfig);
+                String md5 = DigestUtils.md5Hex(jsonStr);
+                newConfigMap.put(clusterName, clusterConfig);
+                newMd5Map.put(clusterName, md5);
+            } catch (Throwable e) {
+                // if get config failed, update the err log.
+                newErrorlogMap.put(clusterName, e.getMessage());
+                LOGGER.error("Failed to update cluster config of {}, error is {}", clusterName, e.getMessage());
+                LOGGER.error(e.getMessage(), e);
+            }
+        });
+        sortClusterErrorLogMap = newErrorlogMap;
+        sortClusterConfigMap = newConfigMap;
+        sortClusterMd5Map = newMd5Map;
+    }
+
+    /**
+     * Get the latest config of specific cluster.
+     *
+     * @param clusterName Cluster name.
+     * @param tasks Task in this cluster.
+     * @param taskIdParamMap All id params.
+     * @param taskSinkParamMap All sink params.
+     * @return The sort cluster config of specific cluster.
+     */
+    private SortClusterConfig getConfigByClusterName(
+            String clusterName,
+            List<SortTaskInfo> tasks,
+            Map<String, List<SortIdInfo>> taskIdParamMap,
+            Map<String, SortSinkInfo> taskSinkParamMap) {
+
+        List<SortTaskConfig> taskConfigs = tasks.stream()
+                .map(task -> {
+                    String taskName = task.getSortTaskName();
+                    String type = task.getSinkType();
+                    List<SortIdInfo> idParams = taskIdParamMap.get(taskName);
+                    SortSinkInfo sinkParams = taskSinkParamMap.get(task.getDataNodeName());
+                    return this.getTaskConfig(taskName, type, idParams, sinkParams);
+                })
+                .collect(Collectors.toList());
+
+        return SortClusterConfig.builder()
+                .clusterName(clusterName)
+                .sortTasks(taskConfigs)
+                .build();
+    }
+
+    /**
+     * Get task config.
+     * <p>
+     *     If there is no any id or sink params, throw exception to upper caller.
+     * </p>
+     *
+     * @param taskName Task name.
+     * @param type Type of sink.
+     * @param idParams Id params.
+     * @param sinkParams Sink params.
+     * @return Task config.
+     */
+    private SortTaskConfig getTaskConfig(
+            String taskName,
+            String type,
+            List<SortIdInfo> idParams,
+            SortSinkInfo sinkParams) {
+
+        Optional.ofNullable(idParams)
+                .orElseThrow(() -> new IllegalStateException(("There is no any id params of task " + taskName)));
+        Optional.ofNullable(sinkParams)
+                .orElseThrow(() -> new IllegalStateException("There is no any sink params of task " + taskName));
+
+        if (!type.equalsIgnoreCase(sinkParams.getType())) {
+            throw new IllegalArgumentException(
+                    String.format("for task %s, task type %s and sink type %s are not identical",
+                            taskName, type, sinkParams.getType()));
+        }
+
+        return SortTaskConfig.builder()
+                .name(taskName)
+                .type(type)
+                .idParams(this.parseIdParams(idParams))
+                .sinkParams(this.parseSinkParams(sinkParams))
+                .build();
+
+    }
+
+    /**
+     * Parse id params from json.
+     * @param rowIdParams IdParams in json format.
+     * @return List of IdParams.
+     */
+    private List<Map<String, String>> parseIdParams(List<SortIdInfo> rowIdParams) {
+        return rowIdParams.stream()
+                .map(row -> {
+                    Map<String, String> param = gson.fromJson(row.getExtParams(), HashMap.class);
+                    // put group and stream info
+                    param.put(KEY_GROUP_ID, row.getInlongGroupId());
+                    param.put(KEY_STREAM_ID, row.getInlongStreamId());
+                    return param;
+                })
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse sink params from json.
+     * @param rowSinkParams Sink params in json format.
+     * @return Sink params.
+     */
+    private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
+        return gson.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+    }
+
+    /**
+     * Set reload timer at the beginning of repository.
+     */
+    private void setReloadTimer() {
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 4b3406b57..a7f30db83 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -22,23 +22,16 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.sdk.CacheZone;
 import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.service.core.SortClusterConfigService;
+import org.apache.inlong.manager.service.core.SortClusterService;
 import org.apache.inlong.manager.service.core.SortSourceService;
-import org.apache.inlong.manager.service.core.SortTaskIdParamService;
 import org.apache.inlong.manager.service.core.SortService;
-import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -54,88 +47,15 @@ public class SortServiceImpl implements SortService {
     private static final int RESPONSE_CODE_FAIL = -1;
     private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
 
-    @Autowired private SortClusterConfigService sortClusterConfigService;
-
-    @Autowired private SortTaskIdParamService sortTaskIdParamService;
-
-    @Autowired private SortTaskSinkParamService sortTaskSinkParamService;
 
     @Autowired private SortSourceService sortSourceService;
 
+    @Autowired private SortClusterService sortClusterService;
+
     @Override
     public SortClusterResponse getClusterConfig(String clusterName, String md5) {
-        LOGGER.info("start getClusterConfig");
-
-        // check if cluster name is valid or not.
-        if (StringUtils.isBlank(clusterName)) {
-            String errMsg = "Blank cluster name, return nothing";
-            LOGGER.info(errMsg);
-            return SortClusterResponse.builder().msg(errMsg).build();
-        }
-
-        // check if there is any task.
-        List<SortClusterConfigEntity> tasks = sortClusterConfigService.selectTasksByClusterName(clusterName);
-
-        if (tasks == null || tasks.isEmpty()) {
-            String errMsg = "There is not any task for cluster" + clusterName;
-            LOGGER.info(errMsg);
-            return SortClusterResponse.builder()
-                    .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
-                    .msg(errMsg)
-                    .build();
-        }
-
-        // add task configs
-        List<SortTaskConfig> taskConfigs = new ArrayList<>();
-        try {
-            tasks.forEach(clusterConfig -> taskConfigs.add(this.getTaskConfig(clusterConfig)));
-        } catch (IllegalArgumentException ex) {
-            String errMsg = "Got illegal sink type from db, " + ex.getMessage();
-            LOGGER.info(errMsg);
-            return SortClusterResponse.builder()
-                    .code(RESPONSE_CODE_FAIL)
-                    .msg(errMsg)
-                    .build();
-        }
-
-        SortClusterConfig clusterConfig = SortClusterConfig.builder()
-                .clusterName(clusterName)
-                .sortTasks(taskConfigs)
-                .build();
-
-        JSONObject job = new JSONObject(clusterConfig);
-        String localMd5 = DigestUtils.md5Hex(job.toString());
-
-        // no update
-        if (localMd5.equals(md5)) {
-            return SortClusterResponse.builder()
-                    .code(RESPONSE_CODE_NO_UPDATE)
-                    .msg("No update")
-                    .md5(localMd5)
-                    .build();
-        }
-
-        return SortClusterResponse.builder()
-                .code(RESPONSE_CODE_SUCCESS)
-                .data(clusterConfig)
-                .msg("success")
-                .md5(localMd5)
-                .build();
-    }
 
-    private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
-        String sinkType = clusterConfig.getSinkType().toUpperCase();
-        List<Map<String, String>> idParams =
-                sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
-        Map<String, String> sinkParams =
-                sortTaskSinkParamService
-                        .selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
-        return SortTaskConfig.builder()
-                .name(clusterConfig.getTaskName())
-                .type(sinkType)
-                .idParams(idParams)
-                .sinkParams(sinkParams)
-                .build();
+        return sortClusterService.getClusterConfig(clusterName, md5);
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
deleted file mode 100644
index cab343d49..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
-import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
-import org.apache.inlong.manager.service.core.SortTaskIdParamService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sort id params service implementation.
- */
-@Service
-public class SortTaskIdParamServiceImpl implements SortTaskIdParamService {
-
-    private static final String KEY_GROUP_ID = "inlongGroupId";
-
-    private static final String KEY_STREAM_ID = "inlongStreamId";
-
-    @Autowired private SortTaskIdParamEntityMapper sortTaskIdParamEntityMapper;
-
-    @Override
-    public List<Map<String, String>> selectByTaskName(String taskName) {
-        List<SortTaskIdParamEntity> taskIdParamEntityList =
-                sortTaskIdParamEntityMapper.selectByTaskName(taskName);
-        Map<String, Map<String, String>> idParams = new HashMap<>();
-        taskIdParamEntityList.forEach(entity -> {
-            Map<String, String> idParam =
-                    idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
-            idParam.put(entity.getParamKey(), entity.getParamValue());
-            idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
-            idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
-        });
-        return new ArrayList<>(idParams.values());
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
deleted file mode 100644
index 0cf5af26f..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
-import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
-import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sort sink params service implementation.
- */
-@Service
-public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
-
-    @Autowired
-    private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
-
-    @Override
-    public Map<String, String> selectByTaskNameAndType(String taskName, String sinkType) {
-        LOGGER.info("task name is {}, sink type is {}", taskName, sinkType);
-        List<SortTaskSinkParamEntity> taskSinkParamEntityList =
-                sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
-        Map<String, String> sinkParams = new HashMap<>();
-        taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
-        return sinkParams;
-    }
-
-}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index 305289377..aacadaa12 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.manager.web.controller.openapi;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
 import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
@@ -27,15 +25,11 @@ import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
 import org.apache.inlong.manager.web.WebBaseTest;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.RequestBuilder;
-import org.springframework.transaction.annotation.Transactional;
 
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
@@ -70,29 +64,7 @@ class SortControllerTest extends WebBaseTest {
         sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask1", "kafka"));
         sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "pulsar"));
     }
-
-    /**
-     * Test if the interface works.
-     *
-     * @throws Exception Exceptions to request generating.
-     */
-    @Test
-    @Transactional
-    void testGetSortClusterConfig() throws Exception {
-        MvcResult mvcResult = mockMvc.perform(
-                        get("/openapi/sort/getClusterConfig")
-                                .param("clusterName", "testCluster")
-                                .param("md5", "testMd5")
-                )
-                .andExpect(status().isOk())
-                .andReturn();
-
-        SortClusterResponse sortClusterResponse = JsonUtils.parseObject(mvcResult.getResponse().getContentAsString(),
-                SortClusterResponse.class);
-        Assertions.assertNotNull(sortClusterResponse);
-        Assertions.assertEquals("testCluster", sortClusterResponse.getData().getClusterName());
-    }
-
+    
     // @Test
     // @Transactional
     public void testErrorSinkType() throws Exception {