You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 02:57:04 UTC

[inlong] 01/03: [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f31a9d6da85f0251005115c610c698d35d93fd71
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Nov 10 10:00:09 2022 +0800

    [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492)
---
 .../repository/DataProxyConfigRepository.java      | 340 +++++++++++----------
 1 file changed, 171 insertions(+), 169 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index d49eb9a34..55a5e7c85 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,10 +19,13 @@ package org.apache.inlong.manager.service.repository;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.ClusterSwitch;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
@@ -34,20 +37,23 @@ import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
-import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
-import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
-import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
-import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
-import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,7 +61,6 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.PostConstruct;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -63,10 +68,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.PostConstruct;
+
 /**
  * DataProxyConfigRepository
  */
@@ -76,6 +84,7 @@ public class DataProxyConfigRepository implements IRepository {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
 
+    public static final String KEY_NAMESPACE = "namespace";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -104,6 +113,8 @@ public class DataProxyConfigRepository implements IRepository {
     private InlongGroupEntityMapper inlongGroupMapper;
     @Autowired
     private StreamSinkEntityMapper streamSinkMapper;
+    @Autowired
+    private SortConfigLoader sortConfigLoader;
 
     @PostConstruct
     public void initialize() {
@@ -195,47 +206,44 @@ public class DataProxyConfigRepository implements IRepository {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void reload() {
-        LOGGER.info("start to reload config.");
-        Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster();
+        LOGGER.info("start to reload config:" + this.getClass().getSimpleName());
+        // reload proxy cluster
+        Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>();
+        this.reloadProxyCluster(proxyClusterMap);
         if (proxyClusterMap.size() == 0) {
             return;
         }
-        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster();
-        Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId();
-        // mapping inlongIdMap
-        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
-            String clusterTag = entry.getValue().getSetName();
-            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
-            if (inlongIds != null) {
-                entry.getValue().setInlongIds(inlongIds);
-            }
-        }
+        // reoload cache cluster
+        this.reloadCacheCluster(proxyClusterMap);
+        // reload inlong group id and inlong stream id
+        this.reloadInlongId(proxyClusterMap);
 
         // generateClusterJson
-        this.generateClusterJson(proxyClusterMap, cacheClusterMap);
+        this.generateClusterJson(proxyClusterMap);
 
-        LOGGER.info("end to reload config.");
+        LOGGER.info("end to reload config:" + this.getClass().getSimpleName());
     }
 
     /**
      * reloadProxyCluster
      */
-    private Map<String, ProxyClusterObject> reloadProxyCluster() {
-        Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
+    private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) {
         for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
             ProxyClusterObject obj = new ProxyClusterObject();
             obj.setName(proxyCluster.getClusterName());
             obj.setSetName(proxyCluster.getClusterTag());
             obj.setZone(proxyCluster.getExtTag());
-            proxyClusterMap.put(obj.getName(), obj);
+            DataProxyCluster clusterObj = new DataProxyCluster();
+            clusterObj.setProxyCluster(obj);
+            proxyClusterMap.put(obj.getName(), clusterObj);
         }
-        return proxyClusterMap;
     }
 
     /**
      * reloadCacheCluster
      */
-    private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
+    private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload cache cluster
         Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
         for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
             if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
@@ -248,126 +256,160 @@ public class DataProxyConfigRepository implements IRepository {
                         .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
             }
         }
-        return cacheClusterMap;
+        // mark cache cluster to proxy cluster
+        Map<String, Map<String, String>> tagCache = new HashMap<>();
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster clusterObj = entry.getValue();
+            ProxyClusterObject proxyObj = clusterObj.getProxyCluster();
+            // cache
+            String clusterTag = proxyObj.getSetName();
+            String extTag = proxyObj.getZone();
+            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+            if (cacheClusterZoneMap != null) {
+                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+                    if (cacheEntry.getValue().size() == 0) {
+                        continue;
+                    }
+                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+                    if (isSubTag(wholeTagMap, subTagMap)) {
+                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+                        cacheSet.setSetName(clusterTag);
+                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+                        cacheSet.setType(cacheClusterList.get(0).getType());
+                        List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters();
+                        for (CacheCluster cacheCluster : cacheClusterList) {
+                            CacheClusterObject obj = new CacheClusterObject();
+                            obj.setName(cacheCluster.getClusterName());
+                            obj.setZone(cacheCluster.getExtTag());
+                            obj.setParams(fromJson(cacheCluster.getExtParams()));
+                            cacheClusters.add(obj);
+                        }
+                    }
+                }
+            }
+        }
     }
 
     /**
-     * reloadInlongId
+     * fromJson
      */
-    private Map<String, List<InLongIdObject>> reloadInlongId() {
-        // parse group
-        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
-        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
-        // parse stream
-        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
-        for (InlongStreamId streamIdObj : clusterSetMapper.selectInlongStreamId()) {
-            String groupId = streamIdObj.getInlongGroupId();
-            InlongGroupId groupIdObj = groupIdMap.get(groupId);
-            if (groupId == null) {
-                continue;
+    private Map<String, String> fromJson(String jsonString) {
+        Map<String, String> mapObj = new HashMap<>();
+        try {
+            JsonObject obj = gson.fromJson(jsonString, JsonObject.class);
+            for (String key : obj.keySet()) {
+                JsonElement child = obj.get(key);
+                if (child.isJsonPrimitive()) {
+                    mapObj.put(key, child.getAsString());
+                } else {
+                    mapObj.put(key, child.toString());
+                }
             }
-            Map<String, String> groupParams = this.getExtParams(groupIdObj.getExtParams());
-            Map<String, String> streamParams = this.getExtParams(streamIdObj.getExtParams());
-            this.parseMasterTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
-            this.parseBackupTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
         }
-        return inlongIdMap;
+        return mapObj;
     }
 
     /**
-     * getExtParams
+     * reloadInlongId
      */
-    @SuppressWarnings("unchecked")
-    private Map<String, String> getExtParams(String extParams) {
-        // parse extparams
-        if (!StringUtils.isEmpty(extParams)) {
-            try {
-                Map<String, String> groupParams = gson.fromJson(extParams, HashMap.class);
-                return groupParams;
-            } catch (Exception e) {
-                LOGGER.error("Fail to parse ext error:{},params:{}", e.getMessage(), extParams, e);
+    private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload inlong group id
+        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+        // reload inlong group ext params
+        Map<String, Map<String, String>> groupParams = new HashMap<>();
+        groupIdMap.forEach((k, v) -> groupParams.put(k, fromJson(v.getExtParams())));
+        // reload inlong group ext
+        List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+                .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
+        groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+        // reload inlong stream id
+        Map<String, InlongStreamId> streamIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongStreamId()
+                .forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+        // reload inlong stream ext params
+        Map<String, Map<String, String>> streamParams = new HashMap<>();
+        streamIdMap.forEach((k, v) -> streamParams.put(k, fromJson(v.getExtParams())));
+        // reload inlong stream ext
+        List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+                .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
+        streamExtCursor.forEach(v -> streamParams
+                .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue()));
+
+        // build Map<clusterTag, List<InlongIdObject>>
+        Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap,
+                streamParams);
+        // mark inlong id to proxy cluster
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            String clusterTag = entry.getValue().getProxyCluster().getSetName();
+            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+            if (inlongIds != null) {
+                entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds);
             }
         }
-        return new HashMap<>();
     }
 
     /**
-     * parseMasterTopic
+     * parseInlongId
      */
-    private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj,
-            Map<String, String> groupParams, Map<String, String> streamParams,
-            Map<String, List<InLongIdObject>> inlongIdMap) {
-        // choose topic
-        String groupTopic = groupIdObj.getTopic();
-        String streamTopic = streamIdObj.getTopic();
-        String finalTopic = null;
-        if (StringUtils.isEmpty(groupTopic)) {
-            // both empty then ignore
-            if (StringUtils.isEmpty(streamTopic)) {
-                return;
-            } else {
-                finalTopic = streamTopic;
+    private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap,
+            Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap,
+            Map<String, Map<String, String>> streamParams) {
+        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+        for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
+            InlongStreamId streamIdObj = entry.getValue();
+            String groupId = streamIdObj.getInlongGroupId();
+            InlongGroupId groupIdObj = groupIdMap.get(groupId);
+            if (groupId == null) {
+                continue;
             }
-        } else {
-            if (StringUtils.isEmpty(streamTopic)) {
-                finalTopic = groupTopic;
+            // master
+            InLongIdObject obj = new InLongIdObject();
+            String inlongId = entry.getKey();
+            obj.setInlongId(inlongId);
+            Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v));
+            Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v));
+            if (StringUtils.isBlank(streamIdObj.getTopic())) {
+                obj.setTopic(groupIdObj.getTopic());
             } else {
-                // Pulsar: namespace+topic
-                finalTopic = groupTopic + "/" + streamTopic;
+                obj.setTopic(streamIdObj.getTopic());
+                obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
+            }
+            inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+            // backup
+            InLongIdObject backupObj = new InLongIdObject();
+            backupObj.setInlongId(inlongId);
+            backupObj.getParams().putAll(obj.getParams());
+            Map<String, String> groupParam = groupParams.get(groupId);
+            if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG)
+                    && groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) {
+                String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG);
+                String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE);
+
+                Map<String, String> streamParam = streamParams.get(inlongId);
+                if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) {
+                    obj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE));
+                    backupObj.getParams().put(KEY_NAMESPACE, groupMqResource);
+                } else {
+                    obj.setTopic(groupMqResource);
+                }
+                inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj);
             }
         }
-        // concat id
-        InLongIdObject obj = new InLongIdObject();
-        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
-        obj.setInlongId(inlongId);
-        obj.setTopic(finalTopic);
-        Map<String, String> params = new HashMap<>();
-        params.putAll(groupParams);
-        params.putAll(streamParams);
-        obj.setParams(params);
-        inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+        return inlongIdMap;
     }
 
     /**
-     * parseBackupTopic
+     * getInlongId
      */
-    private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj,
-            Map<String, String> groupParams, Map<String, String> streamParams,
-            Map<String, List<InLongIdObject>> inlongIdMap) {
-        Map<String, String> params = new HashMap<>();
-        params.putAll(groupParams);
-        params.putAll(streamParams);
-        // find backup cluster tag
-        String clusterTag = params.get(KEY_BACKUP_CLUSTER_TAG);
-        if (StringUtils.isEmpty(clusterTag)) {
-            return;
-        }
-        // find backup topic
-        String groupTopic = groupParams.get(KEY_BACKUP_TOPIC);
-        String streamTopic = streamParams.get(KEY_BACKUP_TOPIC);
-        String finalTopic = null;
-        if (StringUtils.isEmpty(groupTopic)) {
-            // both empty then ignore
-            if (StringUtils.isEmpty(streamTopic)) {
-                return;
-            } else {
-                finalTopic = streamTopic;
-            }
-        } else {
-            if (StringUtils.isEmpty(streamTopic)) {
-                finalTopic = groupTopic;
-            } else {
-                // Pulsar: namespace+topic
-                finalTopic = groupTopic + "/" + streamTopic;
-            }
-        }
-        // concat id
-        InLongIdObject obj = new InLongIdObject();
-        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
-        obj.setInlongId(inlongId);
-        obj.setTopic(finalTopic);
-        obj.setParams(params);
-        inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj);
+    private String getInlongId(String inlongGroupId, String inlongStreamId) {
+        return inlongGroupId + "." + inlongStreamId;
     }
 
     /**
@@ -382,62 +424,22 @@ public class DataProxyConfigRepository implements IRepository {
     /**
      * generateClusterJson
      */
-    @SuppressWarnings("unchecked")
-    private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap,
-            Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) {
+    private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) {
         Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
         Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
-        Map<String, Map<String, String>> tagCache = new HashMap<>();
-        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
-            ProxyClusterObject proxyObj = entry.getValue();
-            // proxy
-            DataProxyCluster clusterObj = new DataProxyCluster();
-            clusterObj.setProxyCluster(proxyObj);
-            // cache
-            String clusterTag = proxyObj.getSetName();
-            String extTag = proxyObj.getZone();
-            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
-            if (cacheClusterZoneMap != null) {
-                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
-                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
-                    if (cacheEntry.getValue().size() == 0) {
-                        continue;
-                    }
-                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
-                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
-                    if (isSubTag(wholeTagMap, subTagMap)) {
-                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
-                        cacheSet.setSetName(clusterTag);
-                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
-                        cacheSet.setType(cacheClusterList.get(0).getType());
-                        List<CacheClusterObject> cacheClusters = new ArrayList<>(cacheClusterList.size());
-                        cacheSet.setCacheClusters(cacheClusters);
-                        for (CacheCluster cacheCluster : cacheClusterList) {
-                            CacheClusterObject obj = new CacheClusterObject();
-                            obj.setName(cacheCluster.getClusterName());
-                            obj.setZone(cacheCluster.getExtTag());
-                            try {
-                                Map<String, String> params = gson.fromJson(cacheCluster.getExtParams(), Map.class);
-                                obj.setParams(params);
-                            } catch (Exception e) {
-                                LOGGER.error(e.getMessage(), e);
-                            }
-                            cacheClusters.add(obj);
-                        }
-                    }
-                }
-            }
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster proxyObj = entry.getValue();
             // json
-            String jsonDataProxyCluster = gson.toJson(clusterObj);
+            String jsonDataProxyCluster = gson.toJson(proxyObj);
             String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
             DataProxyConfigResponse response = new DataProxyConfigResponse();
             response.setResult(true);
             response.setErrCode(DataProxyConfigResponse.SUCC);
             response.setMd5(md5);
-            response.setData(clusterObj);
+            response.setData(proxyObj);
             String jsonResponse = gson.toJson(response);
-            newProxyConfigJson.put(proxyObj.getName(), jsonResponse);
-            newProxyMd5Map.put(proxyObj.getName(), md5);
+            newProxyConfigJson.put(entry.getKey(), jsonResponse);
+            newProxyMd5Map.put(entry.getKey(), md5);
         }
 
         // replace