You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/06 09:58:10 UTC

[incubator-inlong] branch master updated: [INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ee0eba3f [INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)
3ee0eba3f is described below

commit 3ee0eba3f5ffd81e05c40d27e4550d4cd5eb9240
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 6 17:58:04 2022 +0800

    [INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)
    
    * Change the naming of third-party cluster related classes
    
    * Fix code style and unit tests error
---
 .../common/pojo/dataproxy/DataProxyCluster.java    |  15 +-
 .../common/pojo/dataproxy/DataProxyConfig.java     |  50 ++-----
 ...ataProxyConfig.java => DataProxyTopicInfo.java} |  48 ++++--
 ...irdPartyClusterInfo.java => MQClusterInfo.java} |   6 +-
 .../pojo/dataproxy/ThirdPartyClusterDTO.java       |  43 ------
 inlong-dataproxy/bin/dataproxy-start.sh            |   2 +-
 .../inlong/dataproxy/config/ConfigManager.java     | 142 +++++++++---------
 .../inlong/dataproxy/config/RemoteConfigJson.java  |   6 +-
 .../dataproxy/config/RemoteConfigManager.java      | 120 ++++++---------
 .../config/holder/CommonPropertiesHolder.java      |  59 ++++----
 ...onfigHolder.java => MQClusterConfigHolder.java} |  27 ++--
 .../config/holder/PropertiesConfigHolder.java      |   2 +-
 .../ClassResourceCommonPropertiesLoader.java       |  42 +++---
 .../config/loader/CommonPropertiesLoader.java      |  14 +-
 ...artyClusterConfig.java => MQClusterConfig.java} | 162 +++++++++++----------
 .../dataproxy/consts/AttributeConstants.java       |  59 ++++----
 .../inlong/dataproxy/consts/ConfigConstants.java   |   4 +
 .../org/apache/inlong/dataproxy/http/Context.java  |   2 +-
 .../inlong/dataproxy/http/HttpBaseSource.java      |  53 +++----
 .../inlong/dataproxy/http/HttpSourceConstants.java |  30 ----
 .../inlong/dataproxy/http/MappedContext.java       |   7 +-
 .../inlong/dataproxy/http/MessageFilter.java       |  37 ++---
 .../inlong/dataproxy/http/MessageHandler.java      |   3 +-
 .../dataproxy/http/MessageProcessServlet.java      |  34 ++---
 .../inlong/dataproxy/http/SimpleHttpSource.java    |   3 +-
 .../dataproxy/http/SimpleMessageHandler.java       | 153 ++++++-------------
 .../apache/inlong/dataproxy/http/StatusCode.java   |   8 +-
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  16 +-
 .../apache/inlong/dataproxy/sink/SinkContext.java  |  48 +++---
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 125 ++++------------
 .../dataproxy/sink/pulsar/PulsarClientService.java |   8 +-
 .../inlong/dataproxy/sink/pulsar/SinkTask.java     |   4 +-
 .../TestMQClusterConfigLoader.java}                |  29 ++--
 .../TestClassResourceCommonPropertiesLoader.java   |  21 ++-
 .../TestContextCacheClusterConfigLoader.java       |  43 +++---
 .../loader/TestContextIdTopicConfigLoader.java     |  42 +++---
 .../metrics/TestDataProxyMetricItemSet.java        |   6 +-
 .../metrics/TestMetricListenerRunnable.java        |   6 +-
 .../apache/inlong/dataproxy/utils/MockUtils.java   |   6 +-
 .../src/test/resources/common.properties           |  29 ++--
 .../src/test/resources/dataproxy-pulsar.conf       |  27 ++--
 ...ty_cluster.properties => mq_cluster.properties} |   7 +-
 .../src/test/resources/topics.properties           |   2 +-
 .../manager/service/core/InlongClusterService.java |   4 +-
 .../core/impl/InlongClusterServiceImpl.java        |  20 +--
 .../manager/service/mq/util/PulsarUtils.java       |   6 +-
 .../controller/openapi/DataProxyController.java    |  11 +-
 47 files changed, 657 insertions(+), 934 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
index c293c1144..344caf330 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -21,12 +21,13 @@ package org.apache.inlong.common.pojo.dataproxy;
  * DataProxyCluster
  */
 public class DataProxyCluster {
+
     private ProxyClusterObject proxyCluster = new ProxyClusterObject();
     private CacheClusterSetObject cacheClusterSet = new CacheClusterSetObject();
 
     /**
      * get proxyCluster
-     * 
+     *
      * @return the proxyCluster
      */
     public ProxyClusterObject getProxyCluster() {
@@ -35,7 +36,7 @@ public class DataProxyCluster {
 
     /**
      * set proxyCluster
-     * 
+     *
      * @param proxyCluster the proxyCluster to set
      */
     public void setProxyCluster(ProxyClusterObject proxyCluster) {
@@ -44,7 +45,7 @@ public class DataProxyCluster {
 
     /**
      * get cacheClusterSet
-     * 
+     *
      * @return the cacheClusterSet
      */
     public CacheClusterSetObject getCacheClusterSet() {
@@ -53,7 +54,7 @@ public class DataProxyCluster {
 
     /**
      * set cacheClusterSet
-     * 
+     *
      * @param cacheClusterSet the cacheClusterSet to set
      */
     public void setCacheClusterSet(CacheClusterSetObject cacheClusterSet) {
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
index 4b0b6551a..66b8bd74b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
@@ -17,54 +17,32 @@
 
 package org.apache.inlong.common.pojo.dataproxy;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
- * DataProxy config
+ * Data proxy config, includes mq clusters and topic list.
  */
 public class DataProxyConfig {
 
-    private String topic;
-    private String m;
-    private String inlongGroupId;
-
-    public DataProxyConfig() {
-    }
-
-    public DataProxyConfig(String topic, String m, String inlongGroupId) {
-        this.topic = topic;
-        this.m = m;
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    @Override
-    public String toString() {
-        return "DataProxyConfig{topic='" + topic + '\''
-                + ", m='" + m + '\''
-                + ", inlongGroupId='" + inlongGroupId + '\''
-                + '}';
-    }
+    private List<MQClusterInfo> mqClusterList = new ArrayList<>();
 
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
+    private List<DataProxyTopicInfo> topicList = new ArrayList<>();
 
-    public String getM() {
-        return m;
+    public List<MQClusterInfo> getMqClusterList() {
+        return mqClusterList;
     }
 
-    public void setM(String m) {
-        this.m = m;
+    public void setMqClusterList(List<MQClusterInfo> mqClusterList) {
+        this.mqClusterList = mqClusterList;
     }
 
-    public String getInlongGroupId() {
-        return inlongGroupId;
+    public List<DataProxyTopicInfo> getTopicList() {
+        return topicList;
     }
 
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
+    public void setTopicList(List<DataProxyTopicInfo> topicList) {
+        this.topicList = topicList;
     }
 
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
index 4b0b6551a..d1deaa4f1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
@@ -18,28 +18,44 @@
 package org.apache.inlong.common.pojo.dataproxy;
 
 /**
- * DataProxy config
+ * Topic info for DataProxy, includes the topic name and the inlongGroupId to which it belongs.
  */
-public class DataProxyConfig {
+public class DataProxyTopicInfo {
 
+    /**
+     * The topic name that needs to send data
+     */
     private String topic;
-    private String m;
+
+    /**
+     * The inlongGroupId to which the topic belongs
+     */
     private String inlongGroupId;
 
-    public DataProxyConfig() {
+    /**
+     * The data format, will deprecate in the future
+     */
+    @Deprecated
+    private String m;
+
+    public DataProxyTopicInfo() {
     }
 
-    public DataProxyConfig(String topic, String m, String inlongGroupId) {
+    public DataProxyTopicInfo(String topic, String inlongGroupId) {
+        this(topic, inlongGroupId, null);
+    }
+
+    public DataProxyTopicInfo(String topic, String inlongGroupId, String m) {
         this.topic = topic;
-        this.m = m;
         this.inlongGroupId = inlongGroupId;
+        this.m = m;
     }
 
     @Override
     public String toString() {
-        return "DataProxyConfig{topic='" + topic + '\''
-                + ", m='" + m + '\''
+        return "DataProxyTopicInfo{topic='" + topic + '\''
                 + ", inlongGroupId='" + inlongGroupId + '\''
+                + ", m='" + m + '\''
                 + '}';
     }
 
@@ -51,14 +67,6 @@ public class DataProxyConfig {
         this.topic = topic;
     }
 
-    public String getM() {
-        return m;
-    }
-
-    public void setM(String m) {
-        this.m = m;
-    }
-
     public String getInlongGroupId() {
         return inlongGroupId;
     }
@@ -67,4 +75,12 @@ public class DataProxyConfig {
         this.inlongGroupId = inlongGroupId;
     }
 
+    public String getM() {
+        return m;
+    }
+
+    public void setM(String m) {
+        this.m = m;
+    }
+
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
similarity index 96%
rename from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 7a635263b..7b7dd3179 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -20,7 +20,11 @@ package org.apache.inlong.common.pojo.dataproxy;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ThirdPartyClusterInfo {
+/**
+ * MQ cluster info.
+ */
+public class MQClusterInfo {
+
     private String url;
     private String token;
     private Map<String, String> params = new HashMap<>();
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java
deleted file mode 100644
index 87e573eb3..000000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.pojo.dataproxy;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ThirdPartyClusterDTO {
-
-    private List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
-    private List<DataProxyConfig> topicList = new ArrayList<>();
-
-    public List<ThirdPartyClusterInfo> getMqSet() {
-        return mqSet;
-    }
-
-    public void setMqSet(List<ThirdPartyClusterInfo> mqSet) {
-        this.mqSet = mqSet;
-    }
-
-    public List<DataProxyConfig> getTopicList() {
-        return topicList;
-    }
-
-    public void setTopicList(List<DataProxyConfig> topicList) {
-        this.topicList = topicList;
-    }
-}
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index fcf02a9d1..a0de3894c 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -32,7 +32,7 @@ error() {
   fi
 }
 
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,third_party_cluster.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,mq_cluster.properties}
 do
   if [ ! -f "$i" ]; then
     touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index b2f99ad5f..ee591f614 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -27,15 +27,16 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.apache.inlong.dataproxy.config.holder.FileConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.ThirdPartyClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,36 +46,29 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * Config manager class.
+ */
 public class ConfigManager {
 
-    public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<ConfigHolder>();
-    private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
+    public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
     private static volatile boolean isInit = false;
-
     private static ConfigManager instance = null;
 
-    private final PropertiesConfigHolder commonConfig =
-            new PropertiesConfigHolder("common.properties");
-    private final PropertiesConfigHolder topicConfig =
-            new PropertiesConfigHolder("topics.properties");
-    private final ThirdPartyClusterConfigHolder thirdPartyClusterConfigHolder =
-            new ThirdPartyClusterConfigHolder("third_party_cluster.properties");
+    private final PropertiesConfigHolder commonConfig = new PropertiesConfigHolder("common.properties");
+    private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
+    private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
     private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
-    private final GroupIdPropertiesHolder groupIdConfig =
-            new GroupIdPropertiesHolder("groupid_mapping.properties");
-    private final PropertiesConfigHolder dcConfig =
-            new PropertiesConfigHolder("dc_mapping.properties");
-    private final PropertiesConfigHolder transferConfig =
-            new PropertiesConfigHolder("transfer.properties");
-    private final PropertiesConfigHolder tubeSwitchConfig =
-            new PropertiesConfigHolder("tube_switch.properties");
-    private final PropertiesConfigHolder weightHolder =
-            new PropertiesConfigHolder("weight.properties");
-    private final FileConfigHolder blackListConfig =
-            new FileConfigHolder("blacklist.properties");
+
+    private final GroupIdPropertiesHolder groupIdConfig = new GroupIdPropertiesHolder("groupid_mapping.properties");
+    private final PropertiesConfigHolder dcConfig = new PropertiesConfigHolder("dc_mapping.properties");
+    private final PropertiesConfigHolder transferConfig = new PropertiesConfigHolder("transfer.properties");
+    private final PropertiesConfigHolder tubeSwitchConfig = new PropertiesConfigHolder("tube_switch.properties");
+    private final PropertiesConfigHolder weightHolder = new PropertiesConfigHolder("weight.properties");
+    private final FileConfigHolder blackListConfig = new FileConfigHolder("blacklist.properties");
 
     /**
-     * get instance for manager
+     * get instance for config manager
      */
     public static ConfigManager getInstance() {
         if (isInit && instance != null) {
@@ -145,8 +139,8 @@ public class ConfigManager {
         return updatePropertiesHolder(result, topicConfig, false);
     }
 
-    public boolean updateThirdPartyClusterProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, thirdPartyClusterConfigHolder, true);
+    public boolean updateMQClusterProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, mqClusterConfigHolder, true);
     }
 
     public Map<String, String> getMxProperties() {
@@ -197,16 +191,16 @@ public class ConfigManager {
         return topicConfig;
     }
 
-    public ThirdPartyClusterConfigHolder getThirdPartyClusterHolder() {
-        return thirdPartyClusterConfigHolder;
+    public MQClusterConfigHolder getMqClusterHolder() {
+        return mqClusterConfigHolder;
     }
 
-    public ThirdPartyClusterConfig getThirdPartyClusterConfig() {
-        return thirdPartyClusterConfigHolder.getClusterConfig();
+    public MQClusterConfig getMqClusterConfig() {
+        return mqClusterConfigHolder.getClusterConfig();
     }
 
-    public Map<String, String> getThirdPartyClusterUrl2Token() {
-        return thirdPartyClusterConfigHolder.getUrl2token();
+    public Map<String, String> getMqClusterUrl2Token() {
+        return mqClusterConfigHolder.getUrl2token();
     }
 
     /**
@@ -219,16 +213,16 @@ public class ConfigManager {
         private final CloseableHttpClient httpClient;
         private final Gson gson = new Gson();
         private boolean isRunning = true;
-        
-        public static ReloadConfigWorker create(ConfigManager managerInstance) {
-            return new ReloadConfigWorker(managerInstance);
-        }
 
         private ReloadConfigWorker(ConfigManager managerInstance) {
             this.configManager = managerInstance;
             this.httpClient = constructHttpClient();
         }
 
+        public static ReloadConfigWorker create(ConfigManager managerInstance) {
+            return new ReloadConfigWorker(managerInstance);
+        }
+
         private synchronized CloseableHttpClient constructHttpClient() {
             long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
             RequestConfig requestConfig = RequestConfig.custom()
@@ -244,15 +238,14 @@ public class ConfigManager {
         }
 
         private long getSleepTime() {
-            String sleepTimeInMsStr =
-                    configManager.getCommonProperties().get("configCheckInterval");
+            String sleepTimeInMsStr = configManager.getCommonProperties().get("configCheckInterval");
             long sleepTimeInMs = 10000;
             try {
                 if (sleepTimeInMsStr != null) {
                     sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
                 }
-            } catch (Exception ignored) {
-                LOG.info("ignored Exception ", ignored);
+            } catch (Exception e) {
+                LOG.info("ignored exception ", e);
             }
             return sleepTimeInMs + getRandom(0, 5000);
         }
@@ -262,9 +255,7 @@ public class ConfigManager {
         }
 
         private void checkLocalFile() {
-
             for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
-
                 boolean isChanged = holder.checkAndUpdateHolder();
                 if (isChanged) {
                     holder.executeCallbacks();
@@ -272,15 +263,16 @@ public class ConfigManager {
             }
         }
 
-        private boolean checkWithManager(String host, String proxyClusterName) {
+        private boolean checkWithManager(String host, String clusterName) {
+            if (StringUtils.isEmpty(clusterName)) {
+                LOG.error("proxyClusterName is null");
+                return false;
+            }
+
             HttpGet httpGet = null;
             try {
-                if (StringUtils.isEmpty(proxyClusterName)) {
-                    LOG.error("proxyClusterName is null");
-                    return false;
-                }
-                String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
-                        + proxyClusterName;
+                String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH
+                        + "?clusterName=" + clusterName;
                 LOG.info("start to request {} to get config info", url);
                 httpGet = new HttpGet(url);
                 httpGet.addHeader(HttpHeaders.CONNECTION, "close");
@@ -289,37 +281,35 @@ public class ConfigManager {
                 CloseableHttpResponse response = httpClient.execute(httpGet);
                 String returnStr = EntityUtils.toString(response.getEntity());
                 // get groupId <-> topic and m value.
-
                 RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
-                Map<String, String> groupIdToTopic = new HashMap<String, String>();
-                Map<String, String> groupIdToMValue = new HashMap<String, String>();
-                Map<String, String> mqConfig = new HashMap<>();// include url2token and other params
-
-                if (configJson.isSuccess() && configJson.getData() != null) { //success get config
-                    LOG.info("getConfig_v2 result: {}", returnStr);
+                Map<String, String> groupIdToTopic = new HashMap<>();
+                Map<String, String> groupIdToMValue = new HashMap<>();
+                // include url2token and other params
+                Map<String, String> mqConfig = new HashMap<>();
+
+                // get config success
+                if (configJson.isSuccess() && configJson.getData() != null) {
+                    LOG.info("getConfig result: {}", returnStr);
                     /*
                      * get mqUrls <->token maps;
-                     * if mq is pulsar, store format: third-party-cluster.index1=cluster1url1,cluster1url2=token
-                     * if mq is tubemq, token is "", store format: third-party-cluster.index1=cluster1url1,cluster1url2=
+                     * if mq is pulsar, store format: mq_cluster.index1=cluster1url1,cluster1url2=token
+                     * if mq is tubemq, token is "", store format: mq_cluster.index1=cluster1url1,cluster1url2=
                      */
                     int index = 1;
-                    List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
+                    List<MQClusterInfo> clusterSet = configJson.getData().getMqClusterList();
                     if (clusterSet == null || clusterSet.isEmpty()) {
                         LOG.error("getConfig from manager: no available mq config");
                         return false;
                     }
-                    for (ThirdPartyClusterInfo mqCluster : clusterSet) {
-                        String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
-                        String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
-                                + mqCluster.getToken();
+                    for (MQClusterInfo mqCluster : clusterSet) {
+                        String key = MQClusterConfigHolder.URL_STORE_PREFIX + index;
+                        String value =
+                                mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR + mqCluster.getToken();
                         mqConfig.put(key, value);
                         ++index;
                     }
 
-                    // mq other params
-                    mqConfig.putAll(clusterSet.get(0).getParams());
-
-                    for (DataProxyConfig topic : configJson.getData().getTopicList()) {
+                    for (DataProxyTopicInfo topic : configJson.getData().getTopicList()) {
                         if (!StringUtils.isEmpty(topic.getM())) {
                             groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
                         }
@@ -329,14 +319,16 @@ public class ConfigManager {
                     }
                     configManager.addMxProperties(groupIdToMValue);
                     configManager.addTopicProperties(groupIdToTopic);
-                    configManager.updateThirdPartyClusterProperties(mqConfig);
+                    // other params for mq
+                    mqConfig.putAll(clusterSet.get(0).getParams());
+                    configManager.updateMQClusterProperties(mqConfig);
 
                     // store mq common configs and url2token
-                    configManager.getThirdPartyClusterConfig().putAll(mqConfig);
-                    configManager.getThirdPartyClusterHolder()
-                            .setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
+                    configManager.getMqClusterConfig().putAll(mqConfig);
+                    configManager.getMqClusterHolder()
+                            .setUrl2token(configManager.getMqClusterHolder().getUrl2token());
                 } else {
-                    LOG.error("getConfig from manager: {}", configJson.getErrMsg());
+                    LOG.error("getConfig from manager error: {}", configJson.getErrMsg());
                 }
             } catch (Exception ex) {
                 LOG.error("exception caught", ex);
@@ -358,7 +350,6 @@ public class ConfigManager {
                 }
                 String[] hostList = StringUtils.split(managerHosts, ",");
                 for (String host : hostList) {
-
                     if (checkWithManager(host, proxyClusterName)) {
                         break;
                     }
@@ -372,7 +363,6 @@ public class ConfigManager {
         public void run() {
             long count = 0;
             while (isRunning) {
-
                 long sleepTimeInMs = getSleepTime();
                 count += 1;
                 try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 7182d2336..5073acdb6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.dataproxy.config;
 
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 
 public class RemoteConfigJson {
 
     private boolean success;
     private String errMsg;
-    private ThirdPartyClusterDTO data;
+    private DataProxyConfig data;
 
     public boolean isSuccess() {
         return success;
@@ -33,7 +33,7 @@ public class RemoteConfigJson {
         return errMsg;
     }
 
-    public ThirdPartyClusterDTO getData() {
+    public DataProxyConfig getData() {
         return data;
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 861e364e6..228d76bb9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -18,7 +18,6 @@
 package org.apache.inlong.dataproxy.config;
 
 import com.google.gson.Gson;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.http.HttpHeaders;
@@ -41,6 +40,7 @@ import org.apache.inlong.common.pojo.dataproxy.ProxySink;
 import org.apache.inlong.common.pojo.dataproxy.ProxySource;
 import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,25 +61,26 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class RemoteConfigManager implements IRepository {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
-    public static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
     public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
-    public static final String KEY_SET_NAME = "set.name";
-    public static final char FLUME_SEPARATOR = '.';
+    private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+    private static final char FLUME_SEPARATOR = '.';
+    private static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
+    private static final Gson GSON = new Gson();
     private static volatile boolean isInit = false;
     private static RemoteConfigManager instance = null;
 
+    private final AtomicInteger managerIpListIndex = new AtomicInteger(0);
+    private final AtomicReference<DataProxyCluster> currentClusterConfigRef = new AtomicReference<>();
+    private String dataProxyConfigMd5;
+
     private long reloadInterval;
     private Timer reloadTimer;
 
-    //
     private IManagerIpListParser ipListParser;
     private CloseableHttpClient httpClient;
-    private Gson gson = new Gson();
-    private AtomicInteger managerIpListIndex = new AtomicInteger(0);
-    // config
-    private String dataProxyConfigMd5;
-    private AtomicReference<DataProxyCluster> currentClusterConfigRef = new AtomicReference<>();
+
     // flume properties
     private Map<String, String> flumeProperties;
     // inlong id map
@@ -90,7 +91,7 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * get instance for manager
-     * 
+     *
      * @return RemoteConfigManager
      */
     @SuppressWarnings("unchecked")
@@ -129,16 +130,29 @@ public class RemoteConfigManager implements IRepository {
     }
 
     /**
-     * reload
+     * constructHttpClient
+     */
+    private static synchronized CloseableHttpClient constructHttpClient() {
+        long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout((int) timeoutInMs)
+                .setSocketTimeout((int) timeoutInMs).build();
+        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+        httpClientBuilder.setDefaultRequestConfig(requestConfig);
+        return httpClientBuilder.build();
+    }
+
+    /**
+     * Reload config
      */
     public void reload() {
-        LOGGER.info("start to reload config.");
+        LOGGER.info("start to reload config");
         String proxyClusterName = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
-        String setName = CommonPropertiesHolder.getString(KEY_SET_NAME);
-        if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(setName)) {
+        String proxyClusterTag = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+        if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(proxyClusterTag)) {
             return;
         }
-        //
+
         this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
         List<String> managerIpList = this.ipListParser.getIpList();
         if (managerIpList == null || managerIpList.size() == 0) {
@@ -147,12 +161,12 @@ public class RemoteConfigManager implements IRepository {
         int managerIpSize = managerIpList.size();
         for (int i = 0; i < managerIpList.size(); i++) {
             String host = managerIpList.get(Math.abs(managerIpListIndex.getAndIncrement()) % managerIpSize);
-            if (this.reloadDataProxyConfig(proxyClusterName, setName, host)) {
+            if (this.reloadDataProxyConfig(proxyClusterName, proxyClusterTag, host)) {
                 break;
             }
         }
 
-        LOGGER.info("end to reload config.");
+        LOGGER.info("success to reload config");
     }
 
     /**
@@ -166,15 +180,12 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * reloadDataProxyConfig
-     * 
-     * @param  host
-     * @return
      */
-    private boolean reloadDataProxyConfig(String proxyClusterName, String setName, String host) {
+    private boolean reloadDataProxyConfig(String clusterName, String clusterTag, String host) {
         HttpGet httpGet = null;
         try {
-            String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getAllConfig?clusterName="
-                    + proxyClusterName + "&setName=" + setName;
+            String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH
+                    + "?clusterName=" + clusterName + "&clusterTag=" + clusterTag;
             if (StringUtils.isNotBlank(this.dataProxyConfigMd5)) {
                 url += "&md5=" + this.dataProxyConfigMd5;
             }
@@ -188,7 +199,7 @@ public class RemoteConfigManager implements IRepository {
             LOGGER.info("end to request {} to get config info:{}", url, returnStr);
             // get groupId <-> topic and m value.
 
-            DataProxyConfigResponse proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class);
+            DataProxyConfigResponse proxyResponse = GSON.fromJson(returnStr, DataProxyConfigResponse.class);
             if (!proxyResponse.isResult()) {
                 LOGGER.info("Fail to get config info from url:{}, error code is {}", url, proxyResponse.getErrCode());
                 return false;
@@ -216,24 +227,9 @@ public class RemoteConfigManager implements IRepository {
         return true;
     }
 
-    /**
-     * constructHttpClient
-     * 
-     * @return
-     */
-    private static synchronized CloseableHttpClient constructHttpClient() {
-        long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout((int) timeoutInMs)
-                .setSocketTimeout((int) timeoutInMs).build();
-        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
-        httpClientBuilder.setDefaultRequestConfig(requestConfig);
-        return httpClientBuilder.build();
-    }
-
     /**
      * getZone
-     * 
+     *
      * @return
      */
     public String getZone() {
@@ -245,9 +241,7 @@ public class RemoteConfigManager implements IRepository {
     }
 
     /**
-     * getProxyClusterName
-     * 
-     * @return
+     * Get proxy cluster name
      */
     public String getProxyClusterName() {
         DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
@@ -258,16 +252,14 @@ public class RemoteConfigManager implements IRepository {
     }
 
     /**
-     * getProxyClusterName
-     * 
-     * @return
+     * Get proxy cluster tag
      */
-    public String getSetName() {
+    public String getProxyClusterTag() {
         DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
         if (currentClusterConfig != null) {
             return currentClusterConfig.getProxyCluster().getSetName();
         }
-        return CommonPropertiesHolder.getString(KEY_SET_NAME);
+        return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
     }
 
     /**
@@ -285,7 +277,6 @@ public class RemoteConfigManager implements IRepository {
     }
 
     /**
-     * 
      * generateFlumeProperties
      */
     private void generateFlumeProperties() {
@@ -302,9 +293,6 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * generateFlumeChannels
-     * 
-     * @param proxyClusterObject
-     * @param newConfig
      */
     private void generateFlumeChannels(Map<String, String> newConfig) {
         StringBuilder builder = new StringBuilder();
@@ -341,9 +329,6 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * generateFlumeSink
-     * 
-     * @param proxyClusterObject
-     * @param newConfig
      */
     private void generateFlumeSinks(Map<String, String> newConfig) {
         StringBuilder builder = new StringBuilder();
@@ -414,9 +399,6 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * generateFlumeSources
-     * 
-     * @param proxyClusterObject
-     * @param newConfig
      */
     private void generateFlumeSources(Map<String, String> newConfig) {
         StringBuilder builder = new StringBuilder();
@@ -468,8 +450,6 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * getFlumeProperties
-     * 
-     * @return
      */
     public Map<String, String> getFlumeProperties() {
         return flumeProperties;
@@ -477,28 +457,20 @@ public class RemoteConfigManager implements IRepository {
 
     /**
      * getInlongIdMap
-     * 
-     * @return
      */
     public Map<String, InLongIdObject> getInlongIdMap() {
         return inlongIdMap;
     }
 
     /**
-     * 
      * getCurrentClusterConfig
-     * 
-     * @return
      */
     public DataProxyCluster getCurrentClusterConfig() {
-        DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
-        return currentClusterConfig;
+        return currentClusterConfigRef.get();
     }
 
     /**
      * get currentClusterConfigRef
-     * 
-     * @return the currentClusterConfigRef
      */
     public AtomicReference<DataProxyCluster> getCurrentClusterConfigRef() {
         return currentClusterConfigRef;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index a811db310..137d5b7f1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * 
  * CommonPropertiesHolder
  */
 public class CommonPropertiesHolder {
@@ -38,7 +37,7 @@ public class CommonPropertiesHolder {
     public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
     public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
     public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
-    public static final String KEY_CLUSTER_ID = "proxy.cluster.name";
+    public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
     public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
     public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
     public static final String KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
@@ -85,7 +84,7 @@ public class CommonPropertiesHolder {
 
     /**
      * get props
-     * 
+     *
      * @return the props
      */
     public static Map<String, String> get() {
@@ -98,10 +97,10 @@ public class CommonPropertiesHolder {
 
     /**
      * Gets value mapped to key, returning defaultValue if unmapped.
-     * 
-     * @param  key          to be found
-     * @param  defaultValue returned if key is unmapped
-     * @return              value associated with key
+     *
+     * @param key to be found
+     * @param defaultValue returned if key is unmapped
+     * @return value associated with key
      */
     public static String getString(String key, String defaultValue) {
         return get().getOrDefault(key, defaultValue);
@@ -109,9 +108,9 @@ public class CommonPropertiesHolder {
 
     /**
      * Gets value mapped to key, returning null if unmapped.
-     * 
-     * @param  key to be found
-     * @return     value associated with key or null if unmapped
+     *
+     * @param key to be found
+     * @return value associated with key or null if unmapped
      */
     public static String getString(String key) {
         return get().get(key);
@@ -119,10 +118,10 @@ public class CommonPropertiesHolder {
 
     /**
      * getStringFromContext
-     * 
-     * @param  context
-     * @param  key
-     * @param  defaultValue
+     *
+     * @param context
+     * @param key
+     * @param defaultValue
      * @return
      */
     public static String getStringFromContext(Context context, String key, String defaultValue) {
@@ -133,10 +132,10 @@ public class CommonPropertiesHolder {
 
     /**
      * Gets value mapped to key, returning defaultValue if unmapped.
-     * 
-     * @param  key          to be found
-     * @param  defaultValue returned if key is unmapped
-     * @return              value associated with key
+     *
+     * @param key to be found
+     * @param defaultValue returned if key is unmapped
+     * @return value associated with key
      */
     public static Integer getInteger(String key, Integer defaultValue) {
         String value = get().get(key);
@@ -153,9 +152,9 @@ public class CommonPropertiesHolder {
      * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
      * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
      * </p>
-     * 
-     * @param  key to be found
-     * @return     value associated with key or null if unmapped
+     *
+     * @param key to be found
+     * @return value associated with key or null if unmapped
      */
     public static Integer getInteger(String key) {
         return getInteger(key, null);
@@ -163,10 +162,10 @@ public class CommonPropertiesHolder {
 
     /**
      * Gets value mapped to key, returning defaultValue if unmapped.
-     * 
-     * @param  key          to be found
-     * @param  defaultValue returned if key is unmapped
-     * @return              value associated with key
+     *
+     * @param key to be found
+     * @param defaultValue returned if key is unmapped
+     * @return value associated with key
      */
     public static Long getLong(String key, Long defaultValue) {
         String value = get().get(key);
@@ -183,9 +182,9 @@ public class CommonPropertiesHolder {
      * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
      * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
      * </p>
-     * 
-     * @param  key to be found
-     * @return     value associated with key or null if unmapped
+     *
+     * @param key to be found
+     * @return value associated with key or null if unmapped
      */
     public static Long getLong(String key) {
         return getLong(key, null);
@@ -193,7 +192,7 @@ public class CommonPropertiesHolder {
 
     /**
      * getAuditFormatInterval
-     * 
+     *
      * @return
      */
     public static long getAuditFormatInterval() {
@@ -202,6 +201,7 @@ public class CommonPropertiesHolder {
 
     /**
      * isResponseAfterSave
+     *
      * @return
      */
     public static boolean isResponseAfterSave() {
@@ -210,6 +210,7 @@ public class CommonPropertiesHolder {
 
     /**
      * get maxResponseTimeout
+     *
      * @return the maxResponseTimeout
      */
     public static long getMaxResponseTimeout() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
similarity index 75%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
index c15ca189c..e2cd3c8c1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
@@ -18,27 +18,23 @@
 package org.apache.inlong.dataproxy.config.holder;
 
 import com.google.common.base.Splitter;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * value is map
+ * Holder of the MQ cluster config.
  */
-public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
+public class MQClusterConfigHolder extends PropertiesConfigHolder {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ThirdPartyClusterConfigHolder.class);
-    private final ThirdPartyClusterConfig clusterConfig = new ThirdPartyClusterConfig();
+    public static final String URL_STORE_PREFIX = "mq_cluster.index";
 
+    private final MQClusterConfig clusterConfig = new MQClusterConfig();
 
-    public static final String URL_STORE_PREFIX = "third-party-cluster.index";
-
-    public ThirdPartyClusterConfigHolder(String fileName) {
+    public MQClusterConfigHolder(String fileName) {
         super(fileName);
     }
 
@@ -49,7 +45,6 @@ public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
     public void loadFromFileToHolder() {
         super.loadFromFileToHolder();
         Map<String, String> tmpUrl2token = new HashMap<>();
-        Map<String, String> tmpConfig = new HashMap<>();
         for (Map.Entry<String, String> entry : getHolder().entrySet()) {
             if (entry.getKey().startsWith(URL_STORE_PREFIX)) {
                 List<String> kv = Splitter.on(AttributeConstants.KEY_VALUE_SEPARATOR)
@@ -64,15 +59,15 @@ public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
         clusterConfig.putAll(getHolder());
     }
 
-    public void setUrl2token(Map<String, String> newUrl2Token) {
-        clusterConfig.setUrl2token(newUrl2Token);
-    }
-
     public Map<String, String> getUrl2token() {
         return clusterConfig.getUrl2token();
     }
 
-    public ThirdPartyClusterConfig getClusterConfig() {
+    public void setUrl2token(Map<String, String> newUrl2Token) {
+        clusterConfig.setUrl2token(newUrl2Token);
+    }
+
+    public MQClusterConfig getClusterConfig() {
         return clusterConfig;
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index 4a193a3ac..c332de8fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -141,7 +141,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
                 try {
                     inStream.close();
                 } catch (IOException e) {
-                    LOG.error("fail to close input stream at loadTopics from {}, err {}", fileName, e);
+                    LOG.error("fail to loadTopics in inStream.close for file: {}", fileName, e);
                 }
             }
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
index d029442d3..2e2c7c63a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,51 +17,43 @@
 
 package org.apache.inlong.dataproxy.config.loader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
- * 
- * FileCommonPropertiesLoader
+ * Class resource common properties loader
  */
 public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
 
-    public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+    private static final String FILE_NAME = "common.properties";
 
     /**
-     * load
-     * 
-     * @return
+     * load properties
      */
     @Override
     public Map<String, String> load() {
-        return this.loadProperties("common.properties");
+        return this.loadProperties();
     }
 
-    /**
-     * loadProperties
-     * 
-     * @param  fileName
-     * @return
-     */
-    protected Map<String, String> loadProperties(String fileName) {
+    protected Map<String, String> loadProperties() {
         Map<String, String> result = new ConcurrentHashMap<>();
-        try (InputStream inStream = getClass().getClassLoader().getResource(fileName).openStream()) {
+        URL resource = getClass().getClassLoader().getResource(FILE_NAME);
+        try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
             Properties props = new Properties();
             props.load(inStream);
             for (Map.Entry<Object, Object> entry : props.entrySet()) {
                 result.put((String) entry.getKey(), (String) entry.getValue());
             }
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
         } catch (Exception e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+            LOG.error("fail to load properties from file ={}", FILE_NAME, e);
         }
         return result;
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
index fcf839ddf..92d8f7061 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -20,15 +20,15 @@ package org.apache.inlong.dataproxy.config.loader;
 import java.util.Map;
 
 /**
- * 
- * CommonPropertiesLoader
+ * Interface of common properties loader
  */
 public interface CommonPropertiesLoader {
 
     /**
      * load
-     * 
-     * @return
+     *
+     * @return the configuration map
      */
     Map<String, String> load();
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
similarity index 84%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
index 27639e663..5a941096b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
@@ -24,112 +24,124 @@ import org.apache.pulsar.shade.io.netty.util.internal.SystemPropertyUtil;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ThirdPartyClusterConfig extends Context {
+public class MQClusterConfig extends Context {
 
-    private Map<String, String> url2token = new HashMap<>();
-
-    /*
-     * properties key for pulsar client
-     */
-    public static final String MQ_SERVER_URL_LIST = "mq_server_url_list";
-    /*
-     * properties key pulsar producer
-     */
-    public static final String TOKEN = "mq_token";
-    public static final String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+    private static final String SEND_REMOTE = "send_remote";
+    private static final boolean DEFAULT_SEND_REMOTE = false;
     private static final String SEND_TIMEOUT = "send_timeout_mill";
+    private static final int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
+
+    private static final String CLIENT_ID_CACHE = "client_id_cache";
+    private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
     private static final String CLIENT_TIMEOUT = "client_timeout_second";
+    private static final int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
+
     private static final String ENABLE_BATCH = "enable_batch";
+    private static final boolean DEFAULT_ENABLE_BATCH = true;
+
     private static final String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
+    private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+
     private static final String MAX_PENDING_MESSAGES = "max_pending_messages";
-    private static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
-            "max_pending_messages_across_partitions";
+    private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+
+    private static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "max_pending_messages_across_partitions";
+    private static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 500000;
+
     private static final String COMPRESSION_TYPE = "compression_type";
+    private static final String DEFAULT_COMPRESSION_TYPE = "NONE";
+
     private static final String MAX_BATCHING_MESSAGES = "max_batching_messages";
-    private static final String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
-    private static final String SINK_THREAD_NUM = "thread_num";
-    private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
-    private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
-    private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
+    private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+
     private static final String MAX_BATCHING_BYTES = "max_batching_bytes";
-    private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS =
-            "max_batching_publish_delay_millis";
-    private static final String EVENT_QUEUE_SIZE = "event_queue_size";
-    private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
-    private static final String MAX_RETRY_SEND_TIMES = "max_retry_send_times";
+    private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
 
-    private static final String SLA_METRIC_SINK = "sla_metric_sink";
+    private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS = "max_batching_publish_delay_millis";
+    private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
 
+    private static final String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
+    private static final long DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = 30 * 1000L;
 
-    // log params
-    private static final String LOG_TOPIC = "proxy_log_topic";
-    private static final String LOG_STREAMID = "proxy_log_streamid";
-    private static final String LOG_GROUPID = "proxy_log_groupid";
-    private static final String SEND_REMOTE = "send_remote";
+    private static final String SLA_METRIC_SINK = "sla_metric_sink";
+    private static final boolean DEFAULT_SLA_METRIC_SINK = false;
 
-    // tubemq params
+    // TubeMQ params
     private static final String MAX_SURVIVED_TIME = "max_survived_time";
+    private static final int DEFAULT_MAX_SURVIVED_TIME = 300000;
+
     private static final String MAX_SURVIVED_SIZE = "max_survived_size";
+    private static final int DEFAULT_MAX_SURVIVED_SIZE = 3000000;
+
     private static final String NEW_CHECK_PATTERN = "new_check_pattern";
+    private static final boolean DEFAULT_NEW_CHECK_PATTERN = true;
+
     private static final String OLD_METRIC_ON = "old_metric_on";
+    private static final boolean DEFAULT_OLD_METRIC_ON = true;
+
     private static final String SET_VALUE = "set";
+    private static final int DEFAULT_SET_VALUE = 10;
+
     private static final String MAX_TOPICS_EACH_PRODUCER_HOLD = "max_topic_each_producer_hold";
+    private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
+
     private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout";
-    public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
-    public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
-    public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
-    public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
-    public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+    private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
+
+    private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
+    private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
+
+    private static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
+    private static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
+
+    private static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
+    private static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
+
+    private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
+    private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
+
+    private static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+    private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+
+    // log params
+    private static final String LOG_TOPIC = "proxy_log_topic";
+    private static final String DEFAULT_LOG_TOPIC = "manager";
+    private static final String LOG_GROUP_ID = "proxy_log_groupid";
+    private static final String DEFAULT_LOG_GROUP_ID = "proxy_measure_log";
+    private static final String LOG_STREAM_ID = "proxy_log_streamid";
+    private static final String DEFAULT_LOG_STREAM_ID = "manager";
+    private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
+    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
 
     /*
-     * properties for stat
+     * stat params
      */
     private static final String STAT_INTERVAL_SEC = "stat_interval_sec";
-    private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
-    private static final String CLIENT_ID_CACHE = "client_id_cache";
-
-    public static String PULSAR_DEFAULT_AUTH_TYPE = "token";
-    private static final int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
-    private static final int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
-    private static final long DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = 30 * 1000L;
-    private static final boolean DEFAULT_ENABLE_BATCH = true;
-    private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
-    private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
-    private static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 500000;
-    private static final String DEFAULT_COMPRESSION_TYPE = "NONE";
-    private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
-    private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
-    private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
-    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
     private static final int DEFAULT_STAT_INTERVAL_SEC = 60;
+
+    /*
+     * Pulsar params
+     */
+    private static final String SINK_THREAD_NUM = "thread_num";
     private static final int DEFAULT_THREAD_NUM = 4;
-    private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
+    private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
     private static final long DEFAULT_DISK_IO_RATE_PER_SEC = 0L;
-    private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
-    private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+    private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
     private static final int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
             .getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
+    private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
     private static final int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
+    private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+    private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
+    private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
+    private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+    private static final String MAX_RETRY_SEND_TIMES = "max_retry_send_times";
     private static final int DEFAULT_MAX_RETRY_SEND_TIMES = 16;
-    private static final boolean DEFAULT_SLA_METRIC_SINK = false;
 
-    private static final String DEFAULT_LOG_TOPIC = "teg_manager";
-    private static final String DEFAULT_LOG_STREAMID = "b_teg_manager";
-    private static final String DEFAULT_LOG_GROUPID = "proxy_measure_log";
-    private static final boolean DEFAULT_SEND_REMOTE = false;
+    public static String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+    public static String PULSAR_DEFAULT_AUTH_TYPE = "token";
 
-    private static final int DEFAULT_MAX_SURVIVED_TIME = 300000;
-    private static final int DEFAULT_MAX_SURVIVED_SIZE = 3000000;
-    private static final boolean DEFAULT_NEW_CHECK_PATTERN = true;
-    private static final boolean DEFAULT_OLD_METRIC_ON = true;
-    private static final int DEFAULT_SET_VALUE = 10;
-    private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
-    private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
-    public static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
-    public static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
-    public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
-    public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
-    public static int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+    private Map<String, String> url2token = new HashMap<>();
 
     public long getLinkMaxAllowedDelayedMsgCount() {
         return getLong(LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT);
@@ -281,11 +293,11 @@ public class ThirdPartyClusterConfig extends Context {
     }
 
     public String getLogStreamId() {
-        return getString(LOG_STREAMID, DEFAULT_LOG_STREAMID);
+        return getString(LOG_STREAM_ID, DEFAULT_LOG_STREAM_ID);
     }
 
     public String getLogGroupId() {
-        return getString(LOG_GROUPID, DEFAULT_LOG_GROUPID);
+        return getString(LOG_GROUP_ID, DEFAULT_LOG_GROUP_ID);
     }
 
     public boolean getEnableSendRemote() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index c4cbbd5d2..657480d78 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -17,73 +17,76 @@
 
 package org.apache.inlong.dataproxy.consts;
 
-public interface AttributeConstants {
-
-    String SEPARATOR = "&";
-    String KEY_VALUE_SEPARATOR = "=";
-
-    String REQUEST_TYPE = "requestType";
+/**
+ * Attribute constants
+ */
+public class AttributeConstants {
 
-    String OPERATION_TYPE = "operationType";
+    public static final String SEPARATOR = "&";
+    public static final String KEY_VALUE_SEPARATOR = "=";
 
-    String OPERATION_CONTENT = "content";
+    public static final String BODY = "body";
+    public static final String CHARSET = "UTF-8";
+    public static final String HTTP_REQUEST = "http-request";
+    public static final String HTTP_RESPONSE = "http-response";
 
     /**
      * group id unique string id for each business or product
      */
-    String GROUP_ID = "groupId";
+    public static final String GROUP_ID = "groupId";
 
     /**
      * interface id unique string id for each interface of business An interface stand for a kind of
      * data
      */
-    String STREAM_ID = "streamId";
+    public static final String STREAM_ID = "streamId";
 
     /**
      * iname is like a streamId but used in file protocol(m=xxx)
      */
-    String INAME = "iname";
+    public static final String INAME = "iname";
 
     /**
      * data time
      */
-    String DATA_TIME = "dt";
+    public static final String DATA_TIME = "dt";
 
-    String TIME_STAMP = "t";
+    public static final String TIME_STAMP = "t";
 
     /* compress type */
-    String COMPRESS_TYPE = "cp";
+    public static final String COMPRESS_TYPE = "cp";
 
     /* count value for how many records a message body contains */
-    String MESSAGE_COUNT = "cnt";
+    public static final String MESSAGE_COUNT = "cnt";
 
     /* message type */
-    String MESSAGE_TYPE = "mt";
+    public static final String MESSAGE_TYPE = "mt";
 
     /* sort type */
-    String METHOD = "m";
+    public static final String METHOD = "m";
 
     /* global unique id for a message*/
-    String SEQUENCE_ID = "sid";
+    public static final String SEQUENCE_ID = "sid";
 
-    String UNIQ_ID = "uniq";
+    public static final String UNIQ_ID = "uniq";
 
     /* from where */
-    String FROM = "f";
+    public static final String FROM = "f";
+
+    public static final String RCV_TIME = "rt";
 
-    String RCV_TIME = "rt";
+    public static final String NODE_IP = "NodeIP";
 
-    String NODE_IP = "NodeIP";
+    public static final String NUM2NAME = "num2name";
 
-    String NUM2NAME = "num2name";
+    public static final String GROUPID_NUM = "groupIdnum";
 
-    String GROUPID_NUM = "groupIdnum";
+    public static final String STREAMID_NUM = "streamIdnum";
 
-    String STREAMID_NUM = "streamIdnum";
+    public static final String MESSAGE_PARTITION_KEY = "partitionKey";
 
-    String MESSAGE_PARTITION_KEY = "partitionKey";
+    public static final String MESSAGE_SYNC_SEND = "syncSend";
 
-    String MESSAGE_SYNC_SEND = "syncSend";
+    public static final String MESSAGE_IS_ACK = "isAck";
 
-    String MESSAGE_IS_ACK = "isAck";
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index e3b6d8ea7..3e86620eb 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -118,4 +118,8 @@ public class ConfigConstants {
     public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
     public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
 
+    public static final String MANAGER_PATH = "/api/inlong/manager/openapi";
+    public static final String MANAGER_GET_CONFIG_PATH = "/dataproxy/getConfig";
+    public static final String MANAGER_GET_ALL_CONFIG_PATH = "/dataproxy/getAllConfig";
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
index 87c643f6e..c40ff0553 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
@@ -31,7 +31,7 @@ public interface Context {
 
     void clear();
 
-    void destory();
+    void destroy();
 
     boolean containsKey(String key);
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 1535e172b..053209b3b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -18,30 +18,29 @@
 package org.apache.inlong.dataproxy.http;
 
 import com.google.common.base.Preconditions;
-import org.apache.inlong.common.monitor.CounterGroup;
-import org.apache.inlong.common.monitor.CounterGroupExt;
-import org.apache.inlong.common.monitor.StatConstants;
-import org.apache.inlong.common.monitor.StatRunner;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.dataproxy.utils.ConfStringUtils;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.common.monitor.CounterGroup;
+import org.apache.inlong.common.monitor.CounterGroupExt;
+import org.apache.inlong.common.monitor.StatConstants;
+import org.apache.inlong.common.monitor.StatRunner;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.ConfStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HttpBaseSource
-        extends AbstractSource
-        implements EventDrivenSource, Configurable {
+import java.util.HashSet;
+import java.util.Set;
 
-    private static final Logger logger = LoggerFactory.getLogger(HttpBaseSource.class);
+public class HttpBaseSource extends AbstractSource implements EventDrivenSource, Configurable {
 
+    private static final Logger logger = LoggerFactory.getLogger(HttpBaseSource.class);
+    private static final String CONNECTIONS = "connections";
     protected int port;
     protected String host = null;
     protected int maxMsgLength;
@@ -49,18 +48,14 @@ public class HttpBaseSource
     protected String attr;
     protected String messageHandlerName;
     protected boolean filterEmptyMsg;
-    private int statIntervalSec;
-
     protected CounterGroup counterGroup;
     protected CounterGroupExt counterGroupExt;
-
-    private StatRunner statRunner;
-    private Thread statThread;
     protected int maxConnections = Integer.MAX_VALUE;
-    private static final String CONNECTIONS = "connections";
-
     protected boolean customProcessor = false;
     protected Context context;
+    private int statIntervalSec;
+    private StatRunner statRunner;
+    private Thread statThread;
 
     public HttpBaseSource() {
         super();
@@ -71,14 +66,13 @@ public class HttpBaseSource
     @Override
     public synchronized void start() {
         if (statIntervalSec > 0) {
-            Set<String> moniterNames = new HashSet<String>();
-            moniterNames.add(StatConstants.EVENT_SUCCESS);
-            moniterNames.add(StatConstants.EVENT_DROPPED);
-            moniterNames.add(StatConstants.EVENT_EMPTY);
-            moniterNames.add(StatConstants.EVENT_OTHEREXP);
-            moniterNames.add(StatConstants.EVENT_INVALID);
-            statRunner = new StatRunner(getName(), counterGroup, counterGroupExt, statIntervalSec,
-                    moniterNames);
+            Set<String> monitorNames = new HashSet<>();
+            monitorNames.add(StatConstants.EVENT_SUCCESS);
+            monitorNames.add(StatConstants.EVENT_DROPPED);
+            monitorNames.add(StatConstants.EVENT_EMPTY);
+            monitorNames.add(StatConstants.EVENT_OTHEREXP);
+            monitorNames.add(StatConstants.EVENT_INVALID);
+            statRunner = new StatRunner(getName(), counterGroup, counterGroupExt, statIntervalSec, monitorNames);
             statThread = new Thread(statRunner);
             statThread.setName("Thread-Stat-" + this.getName());
             statThread.start();
@@ -107,7 +101,7 @@ public class HttpBaseSource
                 }
 
             } catch (InterruptedException e) {
-                logger.warn("statrunner interrupted");
+                logger.warn("start runner interrupted");
             }
         }
 
@@ -116,7 +110,6 @@ public class HttpBaseSource
 
     /**
      * configure
-     * @param context
      */
     public void configure(Context context) {
         this.context = context;
@@ -157,7 +150,7 @@ public class HttpBaseSource
         try {
             maxConnections = context.getInteger(CONNECTIONS, 5000);
         } catch (NumberFormatException e) {
-            logger.warn("BaseSource\'s \"connections\" property must specify an integer value. {}",
+            logger.warn("BaseSource connections property must specify an integer value {}",
                     context.getString(CONNECTIONS));
         }
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java
deleted file mode 100644
index 49b06b436..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.http;
-
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
-
-public interface HttpSourceConstants
-        extends AttributeConstants {
-
-    String BODY = "body";
-    String CHARSET = "UTF-8";
-
-    String HTTP_REQUEST = "http-request";
-    String HTTP_RESPONSE = "http-response";
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
index 497848670..550208de5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
@@ -22,8 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-public class MappedContext
-        implements Context {
+public class MappedContext implements Context {
 
     private Map<String, Object> mapContext;
 
@@ -33,7 +32,7 @@ public class MappedContext
 
     @Override
     public void init() {
-        mapContext = new HashMap();
+        mapContext = new HashMap<>();
     }
 
     @Override
@@ -57,7 +56,7 @@ public class MappedContext
     }
 
     @Override
-    public void destory() {
+    public void destroy() {
         mapContext = Collections.emptyMap();
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 0b3426019..af903b482 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.dataproxy.http;
 
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.GROUP_ID;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.DATA_TIME;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.STREAM_ID;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.BODY;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.ChannelException;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -31,19 +31,11 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.lang.StringUtils;
-import org.apache.flume.ChannelException;
-import org.apache.inlong.common.monitor.LogCounter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
-public class MessageFilter
-        implements Filter {
+public class MessageFilter implements Filter {
 
     private static final Logger LOG = LoggerFactory.getLogger(MessageFilter.class);
-    private static final LogCounter logCounter = new LogCounter(10,
-            100000, 60 * 1000);
 
     private final int maxMsgLength;
 
@@ -56,8 +48,7 @@ public class MessageFilter
     }
 
     @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
-            throws IOException, ServletException {
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
         HttpServletRequest req = (HttpServletRequest) request;
         HttpServletResponse resp = (HttpServletResponse) response;
 
@@ -76,10 +67,10 @@ public class MessageFilter
         }
 
         String invalidKey = null;
-        String groupId = req.getParameter(GROUP_ID);
-        String streamId = req.getParameter(STREAM_ID);
-        String dt = req.getParameter(DATA_TIME);
-        String body = req.getParameter(BODY);
+        String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+        String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+        String dt = req.getParameter(AttributeConstants.DATA_TIME);
+        String body = req.getParameter(AttributeConstants.BODY);
 
         if (StringUtils.isEmpty(groupId)) {
             invalidKey = "groupId";
@@ -92,7 +83,6 @@ public class MessageFilter
         }
 
         try {
-
             if (invalidKey != null) {
                 LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
                 code = StatusCode.ILLEGAL_ARGUMENT;
@@ -129,7 +119,7 @@ public class MessageFilter
     private String getResultContent(int code, String message, String callback) {
         StringBuilder builder = new StringBuilder();
         if (StringUtils.isNotEmpty(callback)) {
-            builder.append(callback + "(");
+            builder.append(callback).append("(");
         }
         builder.append("{\"code\":\"");
         builder.append(code);
@@ -142,4 +132,5 @@ public class MessageFilter
 
         return builder.toString();
     }
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
index 4804fbabc..ee3d7aa9f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
@@ -20,8 +20,7 @@ package org.apache.inlong.dataproxy.http;
 import org.apache.flume.conf.Configurable;
 import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
 
-public interface MessageHandler
-        extends Configurable {
+public interface MessageHandler extends Configurable {
 
     void init();
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
index 3e63d9497..7fe8757c4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
@@ -17,27 +17,19 @@
 
 package org.apache.inlong.dataproxy.http;
 
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.GROUP_ID;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.DATA_TIME;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.STREAM_ID;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.BODY;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.HTTP_REQUEST;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.HTTP_RESPONSE;
-
 import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MessageProcessServlet
-        extends HttpServlet {
+public class MessageProcessServlet extends HttpServlet {
 
     private static final Logger LOG = LoggerFactory.getLogger(MessageProcessServlet.class);
     private static final LogCounter logCounter = new LogCounter(10, 100000, 60 * 1000);
@@ -55,18 +47,16 @@ public class MessageProcessServlet
     }
 
     @Override
-    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
-            throws ServletException, IOException {
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
         try {
             Context context = new MappedContext();
+            context.put(AttributeConstants.GROUP_ID, req.getParameter(AttributeConstants.GROUP_ID));
+            context.put(AttributeConstants.STREAM_ID, req.getParameter(AttributeConstants.STREAM_ID));
+            context.put(AttributeConstants.DATA_TIME, req.getParameter(AttributeConstants.DATA_TIME));
+            context.put(AttributeConstants.BODY, req.getParameter(AttributeConstants.BODY));
 
-            context.put(GROUP_ID, req.getParameter(GROUP_ID));
-            context.put(STREAM_ID, req.getParameter(STREAM_ID));
-            context.put(DATA_TIME, req.getParameter(DATA_TIME));
-            context.put(BODY, req.getParameter(BODY));
-
-            context.put(HTTP_REQUEST, req);
-            context.put(HTTP_RESPONSE, resp);
+            context.put(AttributeConstants.HTTP_REQUEST, req);
+            context.put(AttributeConstants.HTTP_RESPONSE, resp);
 
             messageHandler.processMessage(context);
         } catch (MessageProcessException e) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
index b5cc6b4e3..40c9c8d41 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
@@ -46,8 +46,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SimpleHttpSource
-        extends HttpBaseSource {
+public class SimpleHttpSource extends HttpBaseSource {
 
     private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 9dbeec9ee..79ecfa03a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -17,21 +17,11 @@
 
 package org.apache.inlong.dataproxy.http;
 
-import com.google.common.base.Splitter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import javax.servlet.http.HttpServletRequest;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.Event;
 import org.apache.inlong.common.monitor.CounterGroup;
 import org.apache.inlong.common.monitor.CounterGroupExt;
 import org.apache.inlong.common.monitor.MonitorIndex;
@@ -42,43 +32,44 @@ import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.source.ServiceDecoder;
 import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
-
+import org.apache.inlong.dataproxy.source.ServiceDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SimpleMessageHandler
-        implements MessageHandler {
+import javax.servlet.http.HttpServletRequest;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SimpleMessageHandler implements MessageHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
     private static final ConfigManager configManager = ConfigManager.getInstance();
-
+    private static final String SEPARATOR = "#";
+    private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
+            new ThreadLocal<SimpleDateFormat>() {
+                @Override
+                protected SimpleDateFormat initialValue() {
+                    return new SimpleDateFormat("yyyyMMddHHmm");
+                }
+            };
+    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
     private final CounterGroup counterGroup;
     private final CounterGroupExt counterGroupExt;
-
-    private static final String SEPARATOR = "#";
-    private final boolean isNewMetricOn = true;
     private final MonitorIndex monitorIndex = new MonitorIndex("Source", 60, 300000);
     private final MonitorIndexExt monitorIndexExt =
             new MonitorIndexExt("DataProxy_monitors#http", 60, 100000);
+    private final ChannelProcessor processor;
+
+    private final boolean isNewMetricOn = true;
 
     @SuppressWarnings("unused")
     private int maxMsgLength;
     private long logCounter = 0L;
     private long channelTrace = 0L;
 
-    private static final ThreadLocal<SimpleDateFormat> dateFormator =
-            new ThreadLocal<SimpleDateFormat>() {
-                @Override
-                protected SimpleDateFormat initialValue() {
-                    return new SimpleDateFormat("yyyyMMddHHmm");
-                }
-            };
-
-    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
-    private final ChannelProcessor processor;
-
     public SimpleMessageHandler(ChannelProcessor processor, CounterGroup counterGroup,
             CounterGroupExt counterGroupExt, ServiceDecoder decoder) {
         this.processor = processor;
@@ -98,14 +89,13 @@ public class SimpleMessageHandler
 
     @Override
     public void processMessage(Context context) throws MessageProcessException {
-        String topic = "test";
-        String topicValue = topic;
+        String topicValue = "test";
         String attr = "m=0";
-        StringBuffer newAttrBuffer = new StringBuffer(attr);
+        StringBuilder newAttrBuffer = new StringBuilder(attr);
 
-        String groupId = (String) context.get(HttpSourceConstants.GROUP_ID);
-        String streamId = (String) context.get(HttpSourceConstants.STREAM_ID);
-        String dt = (String) context.get(HttpSourceConstants.DATA_TIME);
+        String groupId = (String) context.get(AttributeConstants.GROUP_ID);
+        String streamId = (String) context.get(AttributeConstants.STREAM_ID);
+        String dt = (String) context.get(AttributeConstants.DATA_TIME);
 
         String value = getTopic(groupId, streamId);
         if (null != value && !"".equals(value)) {
@@ -114,34 +104,34 @@ public class SimpleMessageHandler
 
         String mxValue = configManager.getMxProperties().get(groupId);
         if (null != mxValue) {
-            newAttrBuffer = new StringBuffer(mxValue.trim());
+            newAttrBuffer = new StringBuilder(mxValue.trim());
         }
 
         newAttrBuffer.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
                 .append("&dt=").append(dt);
         HttpServletRequest request =
-                (HttpServletRequest) context.get(HttpSourceConstants.HTTP_REQUEST);
+                (HttpServletRequest) context.get(AttributeConstants.HTTP_REQUEST);
         String strRemoteIP = request.getRemoteAddr();
         newAttrBuffer.append("&NodeIP=").append(strRemoteIP);
-        String msgCount = request.getParameter(HttpSourceConstants.MESSAGE_COUNT);
+        String msgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
         if (msgCount == null || "".equals(msgCount)) {
             msgCount = "1";
         }
 
         InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
-        String charset = (String) context.get(HttpSourceConstants.CHARSET);
+        String charset = (String) context.get(AttributeConstants.CHARSET);
         if (charset == null || "".equals(charset)) {
             charset = "UTF-8";
         }
-        String body = (String) context.get(HttpSourceConstants.BODY);
+        String body = (String) context.get(AttributeConstants.BODY);
         try {
             inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
         } catch (UnsupportedEncodingException e) {
             throw new MessageProcessException(e);
         }
 
-        Map<String, String> headers = new HashMap<String, String>();
-        headers.put(HttpSourceConstants.DATA_TIME, dt);
+        Map<String, String> headers = new HashMap<>();
+        headers.put(AttributeConstants.DATA_TIME, dt);
         headers.put(ConfigConstants.TOPIC_KEY, topicValue);
         headers.put(AttributeConstants.STREAM_ID, streamId);
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
@@ -149,14 +139,14 @@ public class SimpleMessageHandler
         headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
         byte[] data = inLongMsg.buildArray();
         headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
-        String pkgTime = dateFormator.get().format(inLongMsg.getCreatetime());
+        String pkgTime = DATE_FORMATTER.get().format(inLongMsg.getCreatetime());
         headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
         Event event = EventBuilder.withBody(data, headers);
 
         String counterExtKey = topicValue + "#" + 0 + "#" + strRemoteIP + "#time#" + pkgTime;
         counterGroupExt.addAndGet(counterExtKey, Long.valueOf(msgCount));
 
-        long dtten = 0;
+        long dtten;
         try {
             dtten = Long.parseLong(dt);
         } catch (NumberFormatException e1) {
@@ -167,16 +157,15 @@ public class SimpleMessageHandler
         dtten = dtten / 1000 / 60 / 10;
         dtten = dtten * 1000 * 60 * 10;
 
-        StringBuilder newbase = new StringBuilder();
-        newbase.append("http").append(SEPARATOR).append(topicValue).append(SEPARATOR)
+        StringBuilder newBase = new StringBuilder();
+        newBase.append("http").append(SEPARATOR).append(topicValue).append(SEPARATOR)
                 .append(streamId).append(SEPARATOR).append(strRemoteIP).append(SEPARATOR)
                 .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
                 .append(new SimpleDateFormat("yyyyMMddHHmm").format(dtten)).append(SEPARATOR)
                 .append(pkgTime);
 
         if (isNewMetricOn) {
-            monitorIndex
-                    .addAndGet(new String(newbase), Integer.parseInt(msgCount), 1, data.length, 0);
+            monitorIndex.addAndGet(new String(newBase), Integer.parseInt(msgCount), 1, data.length, 0);
         }
         inLongMsg.reset();
 
@@ -190,29 +179,27 @@ public class SimpleMessageHandler
             counterGroup.incrementAndGet(StatConstants.EVENT_DROPPED);
             monitorIndexExt.incrementAndGet("EVENT_DROPPED");
             if (isNewMetricOn) {
-                monitorIndex.addAndGet(new String(newbase), 0, 0, 0, Integer.parseInt(msgCount));
+                monitorIndex.addAndGet(new String(newBase), 0, 0, 0, Integer.parseInt(msgCount));
             }
 
             logCounter++;
             if (logCounter == 1 || logCounter % 1000 == 0) {
-                LOG.error("Error writting to channel,and will retry after 1s,ex={},"
-                        + "logCounter = {}, spend time={} ms", new Object[]{ex.toString(), logCounter,
-                        System.currentTimeMillis() - beginTime});
+                LOG.error("Error writing to channel, and will retry after 1s, ex={},"
+                        + "logCounter={}, spend time={} ms", ex, logCounter, System.currentTimeMillis() - beginTime);
                 if (logCounter > Long.MAX_VALUE - 10) {
                     logCounter = 0;
-                    LOG.info("logCounter will reverse.");
+                    LOG.info("logCounter will reverse");
                 }
             }
             throw ex;
         }
         channelTrace++;
         if (channelTrace % 600000 == 0) {
-            LOG.info("processor.processEvent spend time={} ms",
-                    System.currentTimeMillis() - beginTime);
+            LOG.info("processor.processEvent spend time={} ms", System.currentTimeMillis() - beginTime);
         }
         if (channelTrace > Long.MAX_VALUE - 10) {
             channelTrace = 0;
-            LOG.info("channelTrace will reverse.");
+            LOG.info("channelTrace will reverse");
         }
     }
 
@@ -220,55 +207,6 @@ public class SimpleMessageHandler
     public void configure(org.apache.flume.Context context) {
     }
 
-    private Map<String, String> getAttributeMap(String attrs) {
-        if (null != attrs && !"".equals(attrs)) {
-            try {
-                Splitter.MapSplitter mySplitter =
-                        Splitter.on("&").trimResults().withKeyValueSeparator("=");
-                return mySplitter.split(attrs);
-            } catch (Exception e) {
-                LOG.error("fail to fillTopicKeyandAttr,attr:{}", attrs);
-                LOG.error("error!", e);
-            }
-        }
-        return null;
-    }
-
-    private Map<String, String> loadProperties(String fileName) {
-        HashMap<String, String> map = new HashMap<String, String>();
-        if (null == fileName) {
-            LOG.error("fail to loadTopics, null == fileName.");
-            return map;
-        }
-        InputStream inStream = null;
-        try {
-            URL url = getClass().getClassLoader().getResource(fileName);
-            inStream = url != null ? url.openStream() : null;
-            if (inStream == null) {
-                LOG.error("InputStream {} is null!", fileName);
-                return map;
-            }
-            Properties props = new Properties();
-            props.load(inStream);
-            for (Map.Entry<Object, Object> entry : props.entrySet()) {
-                map.put((String) entry.getKey(), (String) entry.getValue());
-            }
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("fail to loadPropery, file ={}, and e= {}", fileName, e);
-        } catch (Exception e) {
-            LOG.error("fail to loadProperty, file ={}, and e= {}", fileName, e);
-        } finally {
-            if (null != inStream) {
-                try {
-                    inStream.close();
-                } catch (IOException e) {
-                    LOG.error("fail to loadTopics, inStream.close ,and e= {}", fileName, e);
-                }
-            }
-        }
-        return map;
-    }
-
     private String getTopic(String groupId, String streamId) {
         String topic = null;
         if (StringUtils.isNotEmpty(groupId)) {
@@ -279,8 +217,7 @@ public class SimpleMessageHandler
                 topic = configManager.getTopicProperties().get(groupId);
             }
         }
-        LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + "/" + streamId,
-                topic);
+        LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + "/" + streamId, topic);
         return topic;
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
index f4cf573cf..88e411573 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
@@ -22,20 +22,20 @@ public interface StatusCode {
     /*
      * success
      */
-    public static final int SUCCESS = 1;
+    int SUCCESS = 1;
 
     /*
      * illegal argument
      */
-    public static final int ILLEGAL_ARGUMENT = -100;
+    int ILLEGAL_ARGUMENT = -100;
 
     /*
      * exceed length
      */
-    public static final int EXCEED_LEN = -101;
+    int EXCEED_LEN = -101;
 
     /*
      * service error
      */
-    public static final int SERVICE_ERR = -105;
+    int SERVICE_ERR = -105;
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 19f6e5890..380826649 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -44,7 +44,7 @@ import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
 import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
@@ -172,7 +172,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private Map<String, String> topicProperties;
 
     private Map<String, String> pulsarCluster;
-    private ThirdPartyClusterConfig pulsarConfig;
+    private MQClusterConfig pulsarConfig;
 
     private static final Long PRINT_INTERVAL = 30L;
 
@@ -215,8 +215,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
 
         configManager = ConfigManager.getInstance();
         topicProperties = configManager.getTopicProperties();
-        pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
-        pulsarConfig = configManager.getThirdPartyClusterConfig(); //pulsar common config
+        pulsarCluster = configManager.getMqClusterUrl2Token();
+        pulsarConfig = configManager.getMqClusterConfig(); //pulsar common config
         commonProperties = configManager.getCommonProperties();
         sinkThreadPoolSize = pulsarConfig.getThreadNum();
         if (sinkThreadPoolSize <= 0) {
@@ -250,12 +250,12 @@ public class PulsarSink extends AbstractSink implements Configurable,
                 }
             }
         });
-        configManager.getThirdPartyClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
             @Override
             public void update() {
                 if (pulsarClientService != null) {
                     diffUpdatePulsarClient(pulsarClientService, pulsarCluster,
-                            configManager.getThirdPartyClusterUrl2Token());
+                            configManager.getMqClusterUrl2Token());
                 }
             }
         });
@@ -337,7 +337,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
         pulsarClientService.updatePulsarClients(this, needToClose, needToStart,
                 new HashSet<>(topicProperties.values()));
 
-        pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
+        pulsarCluster = configManager.getMqClusterUrl2Token();
     }
 
     @Override
@@ -752,7 +752,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
         return currentInFlightCount;
     }
 
-    public ThirdPartyClusterConfig getPulsarConfig() {
+    public MQClusterConfig getPulsarConfig() {
         return pulsarConfig;
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
index 5e4d430d3..dbf823bd8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink;
 
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
@@ -29,6 +25,10 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
 /**
  * SinkContext
  */
@@ -37,37 +37,33 @@ public class SinkContext {
     public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class);
 
     public static final String KEY_MAX_THREADS = "maxThreads";
-    public static final String KEY_PROCESSINTERVAL = "processInterval";
-    public static final String KEY_RELOADINTERVAL = "reloadInterval";
+    public static final String KEY_PROCESS_INTERVAL = "processInterval";
+    public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
 
     protected final String clusterId;
     protected final String sinkName;
     protected final Context sinkContext;
 
     protected final Channel channel;
-    //
+
     protected final int maxThreads;
     protected final long processInterval;
     protected final long reloadInterval;
-    //
+
     protected final DataProxyMetricItemSet metricItemSet;
     protected Timer reloadTimer;
 
     /**
      * Constructor
-     * 
-     * @param sinkName
-     * @param context
-     * @param channel
      */
     public SinkContext(String sinkName, Context context, Channel channel) {
         this.sinkName = sinkName;
         this.sinkContext = context;
         this.channel = channel;
-        this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+        this.clusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
-        this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
-        this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+        this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
+        this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
         //
         this.metricItemSet = new DataProxyMetricItemSet(sinkName);
         MetricRegister.register(this.metricItemSet);
@@ -118,7 +114,7 @@ public class SinkContext {
 
     /**
      * get clusterId
-     * 
+     *
      * @return the clusterId
      */
     public String getClusterId() {
@@ -127,7 +123,7 @@ public class SinkContext {
 
     /**
      * get sinkName
-     * 
+     *
      * @return the sinkName
      */
     public String getSinkName() {
@@ -136,7 +132,7 @@ public class SinkContext {
 
     /**
      * get sinkContext
-     * 
+     *
      * @return the sinkContext
      */
     public Context getSinkContext() {
@@ -145,7 +141,7 @@ public class SinkContext {
 
     /**
      * get channel
-     * 
+     *
      * @return the channel
      */
     public Channel getChannel() {
@@ -154,7 +150,7 @@ public class SinkContext {
 
     /**
      * get maxThreads
-     * 
+     *
      * @return the maxThreads
      */
     public int getMaxThreads() {
@@ -163,7 +159,7 @@ public class SinkContext {
 
     /**
      * get processInterval
-     * 
+     *
      * @return the processInterval
      */
     public long getProcessInterval() {
@@ -172,7 +168,7 @@ public class SinkContext {
 
     /**
      * get reloadInterval
-     * 
+     *
      * @return the reloadInterval
      */
     public long getReloadInterval() {
@@ -181,7 +177,7 @@ public class SinkContext {
 
     /**
      * get metricItemSet
-     * 
+     *
      * @return the metricItemSet
      */
     public DataProxyMetricItemSet getMetricItemSet() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index cf019ecb4..97dcaa210 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -34,7 +34,7 @@ import org.apache.flume.source.shaded.guava.RateLimiter;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
@@ -70,15 +70,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TubeSink extends AbstractSink implements Configurable {
 
-    protected static final ConcurrentHashMap<String, Long> agentIdMap = new ConcurrentHashMap<String, Long>();
-
+    protected static final ConcurrentHashMap<String, Long> agentIdMap = new ConcurrentHashMap<>();
     private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
 
-    //    private static int BATCH_SIZE = 10000; //unused
-    private static final int sendNewMetricRetryCount = 3;
-    private static final String topicsFilePath = "topics.properties"; //unused
-    private static final String slaTopicFilePath = "slaTopics.properties";
-
     private static final LoadingCache<String, Long> agentIdCache = CacheBuilder
             .newBuilder().concurrencyLevel(4 * 8).initialCapacity(5000000)
             .expireAfterAccess(30, TimeUnit.SECONDS)
@@ -89,28 +83,27 @@ public class TubeSink extends AbstractSink implements Configurable {
                     return System.currentTimeMillis();
                 }
             });
+    private static final String TOPIC = "topic";
     protected static boolean idCleanerStarted = false;
-    private static String TOPIC = "topic";
-    private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<String, Long>();
+    private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
     // key: masterUrl
     public Map<String, TubeMultiSessionFactory> sessionFactories;
     public Map<String, List<TopicProducerInfo>> masterUrl2producers;
     // key: topic
     public Map<String, List<TopicProducerInfo>> producerInfoMap;
-    public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
     private volatile boolean canTake = false;
     private volatile boolean canSend = false;
     private ConfigManager configManager;
     private Map<String, String> topicProperties;
-    private ThirdPartyClusterConfig tubeConfig;
+    private MQClusterConfig tubeConfig;
     private Set<String> masterHostAndPortLists;
+
     // used for RoundRobin different cluster while send message
     private AtomicInteger clusterIndex = new AtomicInteger(0);
     private LinkedBlockingQueue<EventStat> resendQueue;
     private LinkedBlockingQueue<Event> eventQueue;
     private RateLimiter diskRateLimiter;
     private Thread[] sinkThreadPool;
-    private String metaTopicFilePath = topicsFilePath;
     private Map<String, String> dimensions;
     private DataProxyMetricItemSet metricItemSet;
     private IdCacheCleaner idCacheCleaner;
@@ -123,9 +116,6 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     /**
      * diff publish
-     *
-     * @param originalSet
-     * @param endSet
      */
     public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
         if (SetUtils.isEqualSet(originalSet, endSet)) {
@@ -155,6 +145,7 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     /**
      * when masterUrlLists change, update tubeClient
+     *
      * @param originalCluster previous masterHostAndPortList set
      * @param endCluster new masterHostAndPortList set
      */
@@ -202,30 +193,22 @@ public class TubeSink extends AbstractSink implements Configurable {
         // start new client
         for (String masterUrl : endCluster) {
             if (!originalCluster.contains(masterUrl)) {
-
                 TubeMultiSessionFactory sessionFactory = createConnection(masterUrl);
-
                 if (sessionFactory != null) {
                     List<Set<String>> topicGroups = partitionTopicSet(new HashSet<>(topicProperties.values()));
                     for (Set<String> topicSet : topicGroups) {
                         createTopicProducers(masterUrl, sessionFactory, topicSet);
                     }
-
                     logger.info("successfully start new tubeClient for the new masterList: {}", masterUrl);
                 }
             }
         }
 
-        masterHostAndPortLists = configManager.getThirdPartyClusterUrl2Token().keySet();
+        masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
     }
 
     /**
      * when there are multi clusters, pick producer based on round-robin
-     *
-     * @param topic
-     * @return
-     *
-     * @throws TubeClientException
      */
     private MessageProducer getProducer(String topic) throws TubeClientException {
         if (producerInfoMap.containsKey(topic) && !producerInfoMap.get(topic).isEmpty()) {
@@ -299,17 +282,10 @@ public class TubeSink extends AbstractSink implements Configurable {
             TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
             sessionFactory = new TubeMultiSessionFactory(conf);
             sessionFactories.put(masterHostAndPortList, sessionFactory);
-        } catch (TubeClientException e) {
-            logger.error("create connnection error in tubesink, "
-                    + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
-            throw new FlumeException("connect to Tube error1, "
-                    + "maybe zkstr/zkroot set error, please re-check");
         } catch (Throwable e) {
-            logger.error("create connnection error in tubesink, "
-                            + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}",
-                    e.getMessage());
-            throw new FlumeException("connect to meta error2, "
-                    + "maybe tube master set error/shutdown in progress, please re-check");
+            logger.error("connect to tube meta error, maybe tube master set error/shutdown, please re-check", e);
+            throw new FlumeException("connect to tube meta error, maybe tube master set error/shutdown in progress, "
+                    + "please re-check");
         }
         return sessionFactory;
     }
@@ -326,11 +302,8 @@ public class TubeSink extends AbstractSink implements Configurable {
             for (TubeMultiSessionFactory sessionFactory : sessionFactories.values()) {
                 try {
                     sessionFactory.shutdown();
-                } catch (TubeClientException e) {
-                    logger.error("destroy sessionFactory error in tubesink, MetaClientException {}",
-                            e.getMessage());
                 } catch (Exception e) {
-                    logger.error("destroy sessionFactory error in tubesink, ex {}", e.getMessage());
+                    logger.error("destroy sessionFactory error in tubesink: ", e);
                 }
             }
         }
@@ -342,14 +315,11 @@ public class TubeSink extends AbstractSink implements Configurable {
     /**
      * partition topicSet to different group, each group is associated with a producer;
      * if there are multi clusters, then each group is associated with a set of producer
-     *
-     * @param topicSet
-     * @return
      */
     private List<Set<String>> partitionTopicSet(Set<String> topicSet) {
         List<Set<String>> topicGroups = new ArrayList<>();
 
-        List<String> sortedList = new ArrayList(topicSet);
+        List<String> sortedList = new ArrayList<>(topicSet);
         Collections.sort(sortedList);
         int maxTopicsEachProducerHolder = tubeConfig.getMaxTopicsEachProducerHold();
         int cycle = sortedList.size() / maxTopicsEachProducerHolder;
@@ -357,7 +327,7 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         for (int i = 0; i <= cycle; i++) {
             // allocate topic
-            Set<String> subset = new HashSet<String>();
+            Set<String> subset = new HashSet<>();
             int startIndex = i * maxTopicsEachProducerHolder;
             int endIndex = startIndex + maxTopicsEachProducerHolder - 1;
             if (i == cycle) {
@@ -378,10 +348,6 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     /**
      * create producer and publish topic
-     *
-     * @param masterUrl
-     * @param sessionFactory
-     * @param topicGroup
      */
     private void createTopicProducers(String masterUrl, TubeMultiSessionFactory sessionFactory,
             Set<String> topicGroup) {
@@ -420,20 +386,18 @@ public class TubeSink extends AbstractSink implements Configurable {
         this.dimensions = new HashMap<>();
         this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-        //register metrics
+        // register metrics
         this.metricItemSet = new DataProxyMetricItemSet(this.getName());
         MetricRegister.register(metricItemSet);
 
-        //create tube connection
+        // create tube connection
         try {
             initCreateConnection();
         } catch (FlumeException e) {
             logger.error("Unable to create tube client" + ". Exception follows.", e);
-
-            /* Try to prevent leaking resources. */
+            // Try to prevent leaking resources
             destroyConnection();
-
-            /* FIXME: Mark ourselves as failed. */
+            // FIXME: Mark ourselves as failed
             stop();
             return;
         }
@@ -463,9 +427,6 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     /**
      * resend event
-     *
-     * @param es
-     * @param isDecrement
      */
     private void resendEvent(EventStat es, boolean isDecrement) {
         try {
@@ -512,8 +473,7 @@ public class TubeSink extends AbstractSink implements Configurable {
                     dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
                             event.getHeaders().get(TOPIC));
                 } else {
-                    dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            "");
+                    dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
                 }
                 if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
                     logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
@@ -531,7 +491,6 @@ public class TubeSink extends AbstractSink implements Configurable {
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 }
             } else {
-
                 // logger.info("[{}]No data to process in the channel.",getName());
                 status = Status.BACKOFF;
                 tx.commit();
@@ -541,8 +500,7 @@ public class TubeSink extends AbstractSink implements Configurable {
             try {
                 tx.rollback();
             } catch (Throwable e) {
-                logger.error("metasink transaction rollback exception", e);
-
+                logger.error("meta sink transaction rollback exception", e);
             }
         } finally {
             tx.close();
@@ -552,37 +510,28 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     @Override
     public void configure(Context context) {
-        logger.info(context.toString());
-//        logger.info("sinktest:"+getName()+getChannel());//sinktest:meta-sink-msg2null
+        logger.info("configure from context: {}", context);
 
         configManager = ConfigManager.getInstance();
         topicProperties = configManager.getTopicProperties();
-        masterHostAndPortLists = configManager.getThirdPartyClusterUrl2Token().keySet();
-        Preconditions.checkState(masterHostAndPortLists != null || masterHostAndPortLists.isEmpty(),
-                "No master and port list specified");
-        tubeConfig = configManager.getThirdPartyClusterConfig();
+        masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
+        tubeConfig = configManager.getMqClusterConfig();
         configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
             @Override
             public void update() {
-
-                diffSetPublish(new HashSet<String>(topicProperties.values()),
-                        new HashSet<String>(configManager.getTopicProperties().values()));
+                diffSetPublish(new HashSet<>(topicProperties.values()),
+                        new HashSet<>(configManager.getTopicProperties().values()));
             }
         });
-        configManager.getThirdPartyClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
             @Override
             public void update() {
-                diffUpdateTubeClient(masterHostAndPortLists, configManager.getThirdPartyClusterUrl2Token().keySet());
+                diffUpdateTubeClient(masterHostAndPortLists, configManager.getMqClusterUrl2Token().keySet());
             }
         });
 
         producerInfoMap = new ConcurrentHashMap<>();
         masterUrl2producers = new ConcurrentHashMap<>();
-
-        if (tubeConfig.getEnableSlaMetricSink()) {
-            this.metaTopicFilePath = slaTopicFilePath;
-        }
-
         clientIdCache = tubeConfig.getClientIdCache();
         if (clientIdCache) {
             int survivedTime = tubeConfig.getMaxSurvivedTime();
@@ -609,7 +558,7 @@ public class TubeSink extends AbstractSink implements Configurable {
         sinkThreadPool = new Thread[threadNum];
         int eventQueueSize = tubeConfig.getEventQueueSize();
         Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
-        eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
+        eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
 
         if (tubeConfig.getDiskIoRatePerSec() != 0) {
             diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
@@ -617,8 +566,8 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     }
 
-    private Map getNewDimension(String otherKey, String value) {
-        Map dimensions = new HashMap<>();
+    private Map<String, String> getNewDimension(String otherKey, String value) {
+        Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
         dimensions.put(otherKey, value);
@@ -659,7 +608,6 @@ public class TubeSink extends AbstractSink implements Configurable {
 
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
-
                 }
             } else {
                 boolean hasKey = false;
@@ -715,7 +663,7 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         @Override
         public void run() {
-            logger.info("Sink task {} started.", Thread.currentThread().getName());
+            logger.info("sink task {} started.", Thread.currentThread().getName());
             while (canSend) {
                 boolean decrementFlag = false;
                 boolean resendBadEvent = false;
@@ -761,12 +709,10 @@ public class TubeSink extends AbstractSink implements Configurable {
                     if (expireTime != null) {
                         long currentTime = System.currentTimeMillis();
                         if (expireTime > currentTime) {
-
                             // TODO: need to be improved.
 //                            reChannelEvent(es, topic);
                             continue;
                         } else {
-
                             illegalTopicMap.remove(topic);
                         }
                     }
@@ -785,7 +731,6 @@ public class TubeSink extends AbstractSink implements Configurable {
                     AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
                     sendMessage(producer, event, topic, flagAtomic, es);
                     decrementFlag = flagAtomic.get();
-
                 } catch (InterruptedException e) {
                     logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                     return;
@@ -832,10 +777,6 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         /**
          * addMetric
-         *
-         * @param event
-         * @param result
-         * @param sendTime
          */
         private void addMetric(Event event, boolean result, long sendTime) {
             Map<String, String> dimensions = new HashMap<>();
@@ -893,10 +834,8 @@ public class TubeSink extends AbstractSink implements Configurable {
             if (producer != null) {
                 try {
                     producer.shutdown();
-                } catch (TubeClientException e) {
-                    logger.error("destroy producer error in tubesink, MetaClientException {}", e.getMessage());
                 } catch (Throwable e) {
-                    logger.error("destroy producer error in tubesink, ex {}", e.getMessage());
+                    logger.error("destroy producer error in tube sink", e);
                 }
             }
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index c3ed19b7e..824b7a5d9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -26,7 +26,7 @@ import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
 import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -99,7 +99,7 @@ public class PulsarClientService {
      *
      * @param pulsarConfig
      */
-    public PulsarClientService(ThirdPartyClusterConfig pulsarConfig, int sinkThreadPoolSize) {
+    public PulsarClientService(MQClusterConfig pulsarConfig, int sinkThreadPoolSize) {
 
         this.sinkThreadPoolSize = sinkThreadPoolSize;
 
@@ -288,7 +288,7 @@ public class PulsarClientService {
             return;
         }
         pulsarClients = new ConcurrentHashMap<>();
-        pulsarUrl2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+        pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         Preconditions.checkState(!pulsarUrl2token.isEmpty(), "No pulsar server url specified");
         logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
         for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
@@ -328,7 +328,7 @@ public class PulsarClientService {
 
     private PulsarClient initPulsarClient(String pulsarUrl, String token) throws Exception {
         ClientBuilder builder = PulsarClient.builder();
-        if (ThirdPartyClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
+        if (MQClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
             builder.authentication(AuthenticationFactory.token(token));
         }
         builder.serviceUrl(pulsarUrl)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index 3157f0dc8..2b44a2d88 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.EventStat;
@@ -66,7 +66,7 @@ public class SinkTask extends Thread {
 
     private LoadingCache<String, Long>  agentIdCache;
 
-    private ThirdPartyClusterConfig pulsarConfig;
+    private MQClusterConfig pulsarConfig;
 
     private int maxRetrySendCnt;
     /*
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
similarity index 75%
rename from inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java
rename to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
index 45ee12f33..67cf12f69 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
@@ -15,41 +15,44 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.config.loader;
+package org.apache.inlong.dataproxy.config.holder;
 
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public class TestThirdPartyClusterConfigLoader {
+/**
+ * Test for {@link MQClusterConfigHolder}
+ */
+public class TestMQClusterConfigLoader {
 
     @Test
     public void testUpdateUrl() {
-        Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+        Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         Assert.assertEquals(url2token.size(), 3);
         Map<String, String> newUrl = new HashMap<>();
         newUrl.put("127.0.0.1:8088", "test");
-        ConfigManager.getInstance().getThirdPartyClusterHolder().setUrl2token(newUrl);
-        url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+        ConfigManager.getInstance().getMqClusterHolder().setUrl2token(newUrl);
+        url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         Assert.assertEquals(newUrl, url2token);
     }
 
     @Test
     public void testCommonConfig() {
-        ThirdPartyClusterConfig config = ConfigManager.getInstance().getThirdPartyClusterConfig();
+        MQClusterConfig config = ConfigManager.getInstance().getMqClusterConfig();
         Assert.assertEquals(config.getAuthType(), "token");
         Assert.assertEquals(config.getThreadNum(), 5);
-        Assert.assertEquals(config.getEnableBatch(), true);
+        Assert.assertTrue(config.getEnableBatch());
 
         Map<String, String> newConfig = new HashMap<>();
         newConfig.put("thread_num", "10");
         newConfig.put("disk_io_rate_per_sec", "60000");
-        ConfigManager.getInstance().getThirdPartyClusterConfig().putAll(newConfig);
-        config = ConfigManager.getInstance().getThirdPartyClusterConfig();
+        ConfigManager.getInstance().getMqClusterConfig().putAll(newConfig);
+        config = ConfigManager.getInstance().getMqClusterConfig();
         Assert.assertEquals(config.getAuthType(), "token");
         Assert.assertEquals(config.getThreadNum(), 10);
         Assert.assertEquals(config.getDiskIoRatePerSec(), 60000);
@@ -57,16 +60,16 @@ public class TestThirdPartyClusterConfigLoader {
 
     @Test
     public void testTubeUrl() {
-        Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+        Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         Assert.assertEquals(url2token.size(), 3);
         Assert.assertEquals("", url2token.get("127.0.0.1:8080,127.0.0.1:8088"));
     }
 
     @Test
     public void testPulsarUrl() {
-        Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+        Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         Assert.assertEquals("pulsartoken1", url2token.get("pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600"));
         Assert.assertEquals("pulsartoken2", url2token.get("pulsar2://127.0.0.1:6680"));
-
     }
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
index 6ec998fbc..77773cffc 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,27 +17,24 @@
 
 package org.apache.inlong.dataproxy.config.loader;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.junit.Test;
 
 import java.util.Map;
 
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.common.metric.MetricListener;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 /**
- * 
- * TestClassResourceCommonPropertiesLoader
+ * Test for {@link ClassResourceCommonPropertiesLoader}
  */
 public class TestClassResourceCommonPropertiesLoader {
 
     /**
      * testResult
-     * 
-     * @throws Exception
      */
     @Test
-    public void testResult() throws Exception {
+    public void testResult() {
         // increase source
         ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
         Map<String, String> props = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
index 1501f6371..12bf36991 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,30 +17,29 @@
 
 package org.apache.inlong.dataproxy.config.loader;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
 
 /**
- * 
- * TestContextCacheClusterConfigLoader
+ * Test for {@link ContextCacheClusterConfigLoader}
  */
 public class TestContextCacheClusterConfigLoader {
 
     public static final Logger LOG = LoggerFactory.getLogger(TestContextCacheClusterConfigLoader.class);
-    private static Context context;
     private static Context sinkContext;
 
     /**
@@ -48,19 +47,17 @@ public class TestContextCacheClusterConfigLoader {
      */
     @BeforeClass
     public static void setup() {
-        Map<String, String> result = new ConcurrentHashMap<>();
-        try (InputStream inStream = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource(
-                "dataproxy-pulsar.conf")
-                .openStream()) {
+        URL resource = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource("dataproxy-pulsar.conf");
+        try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
             Properties props = new Properties();
             props.load(inStream);
+
+            Map<String, String> result = new ConcurrentHashMap<>();
             for (Map.Entry<Object, Object> entry : props.entrySet()) {
                 result.put((String) entry.getKey(), (String) entry.getValue());
             }
-            context = new Context(result);
+            Context context = new Context(result);
             sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
         } catch (Exception e) {
             LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
         }
@@ -68,11 +65,9 @@ public class TestContextCacheClusterConfigLoader {
 
     /**
      * testResult
-     * 
-     * @throws Exception
      */
     @Test
-    public void testResult() throws Exception {
+    public void testResult() {
         ContextCacheClusterConfigLoader loader = new ContextCacheClusterConfigLoader();
         loader.configure(sinkContext);
         List<CacheClusterConfig> configList = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
index 376717bc6..9cca96efd 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,30 +17,29 @@
 
 package org.apache.inlong.dataproxy.config.loader;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
 
 /**
- * 
- * TestContextCacheClusterConfigLoader
+ * Test for {@link ContextCacheClusterConfigLoader}
  */
 public class TestContextIdTopicConfigLoader {
 
     public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
-    private static Context context;
     private static Context sinkContext;
 
     /**
@@ -48,19 +47,16 @@ public class TestContextIdTopicConfigLoader {
      */
     @BeforeClass
     public static void setup() {
-        Map<String, String> result = new ConcurrentHashMap<>();
-        try (InputStream inStream = TestContextIdTopicConfigLoader.class.getClassLoader().getResource(
-                "dataproxy-pulsar.conf")
-                .openStream()) {
+        URL resource = TestContextIdTopicConfigLoader.class.getClassLoader().getResource("dataproxy-pulsar.conf");
+        try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
             Properties props = new Properties();
             props.load(inStream);
+            Map<String, String> result = new ConcurrentHashMap<>();
             for (Map.Entry<Object, Object> entry : props.entrySet()) {
                 result.put((String) entry.getKey(), (String) entry.getValue());
             }
-            context = new Context(result);
+            Context context = new Context(result);
             sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
         } catch (Exception e) {
             LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
         }
@@ -68,11 +64,9 @@ public class TestContextIdTopicConfigLoader {
 
     /**
      * testResult
-     * 
-     * @throws Exception
      */
     @Test
-    public void testResult() throws Exception {
+    public void testResult() {
         ContextIdTopicConfigLoader loader = new ContextIdTopicConfigLoader();
         loader.configure(sinkContext);
         List<IdTopicConfig> configList = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
index 7729deac9..af0e03ae1 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 32e5a2163..e072f150f 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
index a074fe519..8996931ec 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index a372d7daa..7fd6fee7c 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -1,22 +1,21 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 #
+
 proxy.cluster.name=proxy_inlong5th_sz
 metricDomains=DataProxy
 metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
-metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
+metricDomains.DataProxy.snapshotInterval=60000
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf b/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
index d472f5146..33da0b457 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
@@ -1,21 +1,20 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 #
+
 proxy_inlong5th_sz.channels=ch-msg1
 proxy_inlong5th_sz.sinks=pulsar-sink-more1
 proxy_inlong5th_sz.sources=agent-source sdk-source
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
similarity index 77%
rename from inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties
rename to inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
index 9ecb8b09b..a49e21048 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
@@ -15,10 +15,9 @@
 # limitations under the License.
 #
 
-third-party-cluster.index1=pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600=pulsartoken1
-third-party-cluster.index2=pulsar2://127.0.0.1:6680=pulsartoken2
-
-third-party-cluster.index3=127.0.0.1:8080,127.0.0.1:8088=
+mq_cluster.index1=pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600=pulsartoken1
+mq_cluster.index2=pulsar2://127.0.0.1:6680=pulsartoken2
+mq_cluster.index3=127.0.0.1:8080,127.0.0.1:8088=
 
 pulsar_auth_type=token
 thread_num=5
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
index e1cac2480..76fd0d4f0 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-b_docker_test_1.docker_test_1=persistent://public/b_docker_test_1/docker_test_1
\ No newline at end of file
+b_docker_test_1.docker_test_1=persistent://public/b_docker_test_1/docker_test_1
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
index 06a3a64bd..9405505c7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
@@ -143,7 +143,7 @@ public interface InlongClusterService {
      * @param clusterName cluster name
      * @return data proxy config, includes mq clusters and topics
      */
-    ThirdPartyClusterDTO getDataProxyConfig(String clusterTag, String clusterName);
+    DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName);
 
     /**
      * Get data proxy cluster list by the given cluster name
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
index dd5300a05..25f278053 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
@@ -27,8 +27,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
@@ -339,7 +339,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     }
 
     @Override
-    public ThirdPartyClusterDTO getDataProxyConfig(String clusterTag, String clusterName) {
+    public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName) {
         LOGGER.debug("GetDPConfig: begin to get config by cluster tag={} name={}", clusterTag, clusterName);
 
         // get all data proxy clusters
@@ -349,7 +349,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
                 .type(ClusterType.DATA_PROXY.getType())
                 .build();
         List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
-        ThirdPartyClusterDTO result = new ThirdPartyClusterDTO();
+        DataProxyConfig result = new DataProxyConfig();
         if (CollectionUtils.isEmpty(clusterList)) {
             LOGGER.warn("GetDPConfig: data proxy cluster not found by tag={} name={}", clusterTag, clusterName);
             return result;
@@ -372,7 +372,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
 
         LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}",
                 clusterTagList, groupList.size());
-        List<DataProxyConfig> topicList = new ArrayList<>();
+        List<DataProxyTopicInfo> topicList = new ArrayList<>();
         for (InlongGroupBriefInfo groupInfo : groupList) {
             String groupId = groupInfo.getInlongGroupId();
             String mqResource = groupInfo.getMqResource();
@@ -400,13 +400,13 @@ public class InlongClusterServiceImpl implements InlongClusterService {
                     String streamId = streamInfo.getInlongStreamId();
                     String topic = String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT,
                             tenant, mqResource, streamInfo.getMqResource());
-                    DataProxyConfig topicConfig = new DataProxyConfig();
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
                     topicConfig.setInlongGroupId(groupId + "/" + streamId);
                     topicConfig.setTopic(topic);
                     topicList.add(topicConfig);
                 }
             } else if (type == MQType.TUBE) {
-                DataProxyConfig topicConfig = new DataProxyConfig();
+                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
                 topicConfig.setInlongGroupId(groupId);
                 topicConfig.setTopic(mqResource);
                 topicList.add(topicConfig);
@@ -415,7 +415,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
 
         // get mq cluster info
         LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", clusterTagList);
-        List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
+        List<MQClusterInfo> mqSet = new ArrayList<>();
         List<String> typeList = Arrays.asList(ClusterType.TUBE.getType(), ClusterType.PULSAR.getType());
         InlongClusterPageRequest pageRequest = InlongClusterPageRequest.builder()
                 .typeList(typeList)
@@ -423,7 +423,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
                 .build();
         List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
         for (InlongClusterEntity cluster : mqClusterList) {
-            ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
+            MQClusterInfo clusterInfo = new MQClusterInfo();
             clusterInfo.setUrl(cluster.getUrl());
             clusterInfo.setToken(cluster.getToken());
             Map<String, String> configParams = GSON.fromJson(cluster.getExtParams(), Map.class);
@@ -431,7 +431,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
             mqSet.add(clusterInfo);
         }
 
-        result.setMqSet(mqSet);
+        result.setMqClusterList(mqSet);
         result.setTopicList(topicList);
 
         return result;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
index 70dfe7f52..ddb11af17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
@@ -41,10 +41,8 @@ public class PulsarUtils {
     /**
      * Get pulsar admin info
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo)
-            throws PulsarClientException {
-        Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "pulsar adminUrl is empty, "
-                + "check third party cluster table");
+    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo) throws PulsarClientException {
+        Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "Pulsar adminUrl cannot be empty");
         PulsarAdmin pulsarAdmin;
         if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
             pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 3c3701a8f..f3993a79a 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -21,7 +21,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.manager.service.core.InlongClusterService;
@@ -68,14 +69,14 @@ public class DataProxyController {
             @ApiImplicitParam(name = "clusterTag", value = "cluster tag", dataTypeClass = String.class),
             @ApiImplicitParam(name = "clusterName", value = "cluster name", dataTypeClass = String.class)
     })
-    public Response<ThirdPartyClusterDTO> getConfig(
+    public Response<DataProxyConfig> getConfig(
             @RequestParam(required = false) String clusterTag,
             @RequestParam(required = true) String clusterName) {
-        ThirdPartyClusterDTO dto = clusterService.getDataProxyConfig(clusterTag, clusterName);
-        if (dto.getMqSet().isEmpty() || dto.getTopicList().isEmpty()) {
+        DataProxyConfig config = clusterService.getDataProxyConfig(clusterTag, clusterName);
+        if (CollectionUtils.isEmpty(config.getMqClusterList()) || CollectionUtils.isEmpty(config.getTopicList())) {
             return Response.fail("failed to get mq clusters or topics");
         }
-        return Response.success(dto);
+        return Response.success(config);
     }
 
     @GetMapping("/dataproxy/getAllConfig")