You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/23 04:08:26 UTC

[inlong] branch master updated: [INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4d33e13 [INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)
ed4d33e13 is described below

commit ed4d33e136d0787af8c47eba54a0a2f0a2a4901a
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Sat Jul 23 12:08:20 2022 +0800

    [INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)
---
 .../inlong/sdk/dataproxy/ConfigConstants.java      |   2 +
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 263 ++++++++++---------
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |   5 +-
 .../sdk/dataproxy/config/ProxyClusterConfig.java   |  39 +++
 .../sdk/dataproxy/config/ProxyConfigEntry.java     |  30 +--
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 284 ++++++++-------------
 .../inlong/sdk/dataproxy/network/Sender.java       |   6 +-
 .../sdk/dataproxy/ProxyConfigManagerTest.java      |  55 ++++
 .../src/test/resources/proxylist.json              |  18 ++
 9 files changed, 362 insertions(+), 340 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index ead7cffbd..988b50429 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -67,4 +67,6 @@ public class ConfigConstants {
     public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
     public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
 
+    public static final String MANAGER_DATAPROXY_API = "/api/inlong/manager/openapi/dataproxy/getIpList/";
+
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index b31e88e29..e5e3496cd 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,13 +18,6 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -38,12 +31,27 @@ import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class DefaultMessageSender implements MessageSender {
-    private static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class);
     private static final long DEFAULT_SEND_TIMEOUT = 100;
     private static final TimeUnit DEFAULT_SEND_TIMEUNIT = TimeUnit.MILLISECONDS;
+    private static final ConcurrentHashMap<Integer, DefaultMessageSender> CACHE_SENDER =
+            new ConcurrentHashMap<>();
+    private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
+    private static ManagerFetcherThread managerFetcherThread;
     private final Sender sender;
     private final SequentialID idGenerator;
+    private final IndexCollectThread indexCol;
+    /* Store index <groupId_streamId,cnt>*/
+    private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
     private String groupId;
     private int msgtype = ConfigConstants.MSG_TYPE;
     private boolean isCompress = true;
@@ -52,16 +60,71 @@ public class DefaultMessageSender implements MessageSender {
     private boolean isSupportLF = false;
     private int cpsSize = ConfigConstants.COMPRESS_SIZE;
 
-    private static final ConcurrentHashMap<String, DefaultMessageSender> cacheSender =
-            new ConcurrentHashMap<>();
+    public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
+        this(configure, null);
+    }
 
-    private final IndexCollectThread indexCol;
-    /* Store index <groupId_streamId,cnt>*/
-    private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
+    public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
+        ProxyUtils.validClientConfig(configure);
+        sender = new Sender(configure, selfDefineFactory);
+        idGenerator = new SequentialID(Utils.getLocalIp());
+        groupId = configure.getGroupId();
+        indexCol = new IndexCollectThread(storeIndex);
+        indexCol.start();
+
+        if (configure.isEnableSaveManagerVIps()
+                && configure.isLocalVisit()
+                && MANAGER_FETCHER_THREAD_STARTED.compareAndSet(false, true)) {
+            managerFetcherThread = new ManagerFetcherThread(configure);
+            managerFetcherThread.start();
+        }
+    }
+
+    /**
+     * generate by cluster id
+     *
+     * @param configure - sender
+     * @return - sender
+     */
+    public static DefaultMessageSender generateSenderByClusterId(
+            ProxyClientConfig configure) throws Exception {
 
-    private static final AtomicBoolean ManagerFetcherThreadStarted = new AtomicBoolean(false);
+        return generateSenderByClusterId(configure, null);
+    }
 
-    private static ManagerFetcherThread managerFetcherThread;
+    /**
+     * generate by cluster id
+     *
+     * @param configure - sender
+     * @param selfDefineFactory - sender factory
+     * @return - sender
+     */
+    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
+            ThreadFactory selfDefineFactory) throws Exception {
+        ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
+                Utils.getLocalIp(), null);
+        proxyConfigManager.setGroupId(configure.getGroupId());
+        ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
+        DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
+        if (sender != null) {
+            return sender;
+        } else {
+            DefaultMessageSender tmpMessageSender =
+                    new DefaultMessageSender(configure, selfDefineFactory);
+            CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
+            return tmpMessageSender;
+        }
+    }
+
+    /**
+     * finally clean up
+     */
+    public static void finallyCleanup() {
+        for (DefaultMessageSender sender : CACHE_SENDER.values()) {
+            sender.close();
+        }
+        CACHE_SENDER.clear();
+    }
 
     public boolean isSupportLF() {
         return isSupportLF;
@@ -119,85 +182,19 @@ public class DefaultMessageSender implements MessageSender {
         this.groupId = groupId;
     }
 
-    public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
-        this(configure, null);
-    }
-
-    public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
-        ProxyUtils.validClientConfig(configure);
-        sender = new Sender(configure, selfDefineFactory);
-        idGenerator = new SequentialID(Utils.getLocalIp());
-        groupId = configure.getGroupId();
-        indexCol = new IndexCollectThread(storeIndex);
-        indexCol.start();
-
-        if (configure.isEnableSaveManagerVIps()
-                && configure.isLocalVisit()
-                && ManagerFetcherThreadStarted.compareAndSet(false, true)) {
-            managerFetcherThread = new ManagerFetcherThread(configure);
-            managerFetcherThread.start();
-        }
-    }
-
-    /**
-     * generate by cluster id
-     *
-     * @param configure - sender
-     * @return - sender
-     */
-    public static DefaultMessageSender generateSenderByClusterId(
-            ProxyClientConfig configure) throws Exception {
-
-        return generateSenderByClusterId(configure, null);
-    }
-
-    /**
-     *  generate by cluster id
-     *
-     * @param configure         - sender
-     * @param selfDefineFactory - sender factory
-     * @return - sender
-     */
-    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
-            ThreadFactory selfDefineFactory) throws Exception {
-        ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
-                Utils.getLocalIp(), null);
-        proxyConfigManager.setGroupId(configure.getGroupId());
-        ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
-        DefaultMessageSender sender = cacheSender.get(entry.getClusterId());
-        if (sender != null) {
-            return sender;
-        } else {
-            DefaultMessageSender tmpMessageSender =
-                    new DefaultMessageSender(configure, selfDefineFactory);
-            cacheSender.put(entry.getClusterId(), tmpMessageSender);
-            return tmpMessageSender;
-        }
-    }
-
-    /**
-     * finally clean up
-     */
-    public static void finallyCleanup() {
-        for (DefaultMessageSender sender : cacheSender.values()) {
-            sender.close();
-        }
-        cacheSender.clear();
-    }
-
     public String getSDKVersion() {
         return ConfigConstants.PROXY_SDK_VERSION;
     }
 
     @Deprecated
     public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sender.syncSendMessage(new EncodeObject(body, attributes,
                 idGenerator.getNextId()), msgUUID, timeout, timeUnit);
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -227,7 +224,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -261,7 +258,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -289,10 +286,10 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-            || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, bodyList.size());
@@ -321,14 +318,14 @@ public class DefaultMessageSender implements MessageSender {
 
     @Deprecated
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()),
                 callback, msgUUID, timeout, timeUnit);
     }
 
     public void asyncSendMessage(SendMessageCallback callback, byte[] body,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -345,8 +342,8 @@ public class DefaultMessageSender implements MessageSender {
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
                 sender.asyncSendMessage(new EncodeObject(body, "groupId="
-                        + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
-                        idGenerator.getNextId(), this.getMsgtype(), true, groupId),
+                                + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+                                idGenerator.getNextId(), this.getMsgtype(), true, groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
@@ -360,9 +357,9 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+            byte[] body, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -393,8 +390,8 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -424,12 +421,12 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+            List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-            || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
@@ -459,11 +456,11 @@ public class DefaultMessageSender implements MessageSender {
 
     /**
      * asyncSendMessage
-     * 
-     * @param  inlongGroupId
-     * @param  inlongStreamId
-     * @param  body
-     * @param  callback
+     *
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param body
+     * @param callback
      * @throws ProxysdkException
      */
     @Override
@@ -475,11 +472,11 @@ public class DefaultMessageSender implements MessageSender {
 
     /**
      * asyncSendMessage
-     * 
-     * @param  inlongGroupId
-     * @param  inlongStreamId
-     * @param  bodyList
-     * @param  callback
+     *
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param bodyList
+     * @param callback
      * @throws ProxysdkException
      */
     @Override
@@ -499,17 +496,17 @@ public class DefaultMessageSender implements MessageSender {
                 storeIndex.put(key, cnt);
             }
         } catch (Exception e) {
-            logger.error(e.getMessage());
+            LOGGER.error(e.getMessage());
         }
     }
 
     public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
-                                     String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                     long timeout, TimeUnit timeUnit,
-                                     Map<String, String> extraAttrMap) throws ProxysdkException {
+            String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-            || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
@@ -526,8 +523,8 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
-                                 String streamId, long dt, int sid, String ip, String msgUUID,
-                                 long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
+            String streamId, long dt, int sid, String ip, String msgUUID,
+            long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -541,23 +538,23 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
-                                    long dt, int sid, String ip, String msgUUID,
-                                    long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            long dt, int sid, String ip, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
     public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
-                                     String streamId, long dt, int sid, String msgUUID,
-                                     long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String streamId, long dt, int sid, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
     public String sendMessageData(List<byte[]> bodyList, String groupId,
-                                  String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-            || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
         }
         addIndexCnt(groupId, streamId, bodyList.size());
@@ -575,7 +572,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
-                              long timeout, TimeUnit timeUnit, String messageKey) {
+            long timeout, TimeUnit timeUnit, String messageKey) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,26 +586,26 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public String sendMessageProxy(byte[] body, String groupId, String streamId,
-                                   long dt, int sid, String ip, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) {
+            long dt, int sid, String ip, String msgUUID,
+            long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
     public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
     private void shutdownInternalThreads() {
         indexCol.shutDown();
         managerFetcherThread.shutdown();
-        ManagerFetcherThreadStarted.set(false);
+        MANAGER_FETCHER_THREAD_STARTED.set(false);
     }
 
     public void close() {
-        logger.info("ready to close resources, may need five minutes !");
-        if (sender.getClusterId() != null) {
-            cacheSender.remove(sender.getClusterId());
+        LOGGER.info("ready to close resources, may need five minutes !");
+        if (sender.getClusterId() != -1) {
+            CACHE_SENDER.remove(sender.getClusterId());
         }
         sender.close();
         shutdownInternalThreads();
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index e6516f35f..d68eba7f8 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -100,8 +100,11 @@ public class ProxyClientConfig {
         if (Utils.isBlank(managerIp)) {
             throw new IllegalArgumentException("managerIp is Blank!");
         }
+        if (Utils.isBlank(groupId)) {
+            throw new ProxysdkException("groupId is blank!");
+        }
         this.proxyIPServiceURL =
-                "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
+                "http://" + managerIp + ":" + managerPort + ConfigConstants.MANAGER_DATAPROXY_API + groupId;
         this.groupId = groupId;
         this.netTag = netTag;
         this.isLocalVisit = isLocalVisit;
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java
new file mode 100644
index 000000000..a9a347d2e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.config;
+
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
+
+public class ProxyClusterConfig {
+    private boolean success;
+    private String errMsg;
+    private DataProxyNodeResponse data;
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+
+    public DataProxyNodeResponse getData() {
+        return data;
+    }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 5662e2b37..c975fafa4 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -21,12 +21,10 @@ package org.apache.inlong.sdk.dataproxy.config;
 import java.util.Map;
 
 public class ProxyConfigEntry implements java.io.Serializable {
-    private String clusterId;
+    private int clusterId;
     private String groupId;
     private int size;
     private Map<String, HostInfo> hostMap;
-    private int groupIdNum;
-    private Map<String, Integer> streamIdNumMap;
     private int load;
     private int switchStat;
     private boolean isInterVisit;
@@ -39,19 +37,6 @@ public class ProxyConfigEntry implements java.io.Serializable {
         this.load = load;
     }
 
-    public int getGroupIdNum() {
-        return groupIdNum;
-    }
-
-    public Map<String, Integer> getStreamIdNumMap() {
-        return streamIdNumMap;
-    }
-
-    public void setGroupIdNumAndStreamIdNumMap(int groupIdNum, Map<String, Integer> streamIdNumMap) {
-        this.groupIdNum = groupIdNum;
-        this.streamIdNumMap = streamIdNumMap;
-    }
-
     public int getSwitchStat() {
         return switchStat;
     }
@@ -73,6 +58,10 @@ public class ProxyConfigEntry implements java.io.Serializable {
         return size;
     }
 
+    public void setSize(int size) {
+        this.size = size;
+    }
+
     public String getGroupId() {
         return groupId;
     }
@@ -91,17 +80,16 @@ public class ProxyConfigEntry implements java.io.Serializable {
 
     @Override
     public String toString() {
-        return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn="
-                + groupIdNum + ", tsnMap=" + streamIdNumMap
-                + ", size=" + size + ", isInterVisit=" + isInterVisit + ", groupId=" + groupId
+        return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", size=" + size + ", isInterVisit="
+                + isInterVisit + ", groupId=" + groupId
                 + ", switch=" + switchStat + "]";
     }
 
-    public String getClusterId() {
+    public int getClusterId() {
         return clusterId;
     }
 
-    public void setClusterId(String clusterId) {
+    public void setClusterId(int clusterId) {
         this.clusterId = clusterId;
     }
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 8dc833345..e96bd06c0 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -24,7 +24,11 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.stream.JsonReader;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.config.RequestConfig;
@@ -41,6 +45,8 @@ import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.http.util.EntityUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.sdk.dataproxy.ConfigConstants;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
@@ -82,7 +88,7 @@ import static org.apache.inlong.sdk.dataproxy.ConfigConstants.REQUEST_HEADER_AUT
 public class ProxyConfigManager extends Thread {
 
     public static final String APPLICATION_JSON = "application/json";
-    private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConfigManager.class);
     private final ProxyClientConfig clientConfig;
     private final String localIP;
     private final ClientMgr clientManager;
@@ -113,7 +119,7 @@ public class ProxyConfigManager extends Thread {
     }
 
     public void shutDown() {
-        logger.info("Begin to shut down ProxyConfigManager!");
+        LOGGER.info("Begin to shut down ProxyConfigManager!");
         bShutDown = true;
     }
 
@@ -123,9 +129,9 @@ public class ProxyConfigManager extends Thread {
             try {
                 doProxyEntryQueryWork();
                 updateEncryptConfigEntry();
-                logger.info("ProxyConf update!");
+                LOGGER.info("ProxyConf update!");
             } catch (Throwable e) {
-                logger.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace());
+                LOGGER.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace());
                 e.printStackTrace();
             }
 
@@ -138,13 +144,13 @@ public class ProxyConfigManager extends Thread {
                 if (proxyUpdateIntervalSec > 5) {
                     sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % (proxyUpdateIntervalSec / 5);
                 }
-                logger.info("sleep time {}", sleepTimeSec);
+                LOGGER.info("sleep time {}", sleepTimeSec);
                 Thread.sleep(sleepTimeSec * 1000);
             } catch (Throwable e2) {
                 //
             }
         }
-        logger.info("ProxyConfigManager worker existed!");
+        LOGGER.info("ProxyConfigManager worker existed!");
     }
 
     /**
@@ -161,11 +167,11 @@ public class ProxyConfigManager extends Thread {
             if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
                 JsonReader reader = new JsonReader(new FileReader(configCachePath));
                 ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class);
-                logger.info("{} has a backup! {}", groupId, proxyConfigEntry);
+                LOGGER.info("{} has a backup! {}", groupId, proxyConfigEntry);
                 return proxyConfigEntry;
             }
         } catch (Exception ex) {
-            logger.warn("try to read local cache, caught {}", ex.getMessage());
+            LOGGER.warn("try to read local cache, caught {}", ex.getMessage());
         } finally {
             rw.readLock().unlock();
         }
@@ -180,13 +186,13 @@ public class ProxyConfigManager extends Thread {
                 // try to create parent
                 file.getParentFile().mkdirs();
             }
-            logger.info("try to write {}} to local cache {}", entry, configCachePath);
+            LOGGER.info("try to write {}} to local cache {}", entry, configCachePath);
             FileWriter fileWriter = new FileWriter(configCachePath);
             gson.toJson(entry, fileWriter);
             fileWriter.flush();
             fileWriter.close();
         } catch (Exception ex) {
-            logger.warn("try to write local cache, caught {}", ex.getMessage());
+            LOGGER.warn("try to write local cache, caught {}", ex.getMessage());
         } finally {
             rw.writeLock().unlock();
         }
@@ -196,7 +202,7 @@ public class ProxyConfigManager extends Thread {
         try {
             return requestProxyList(this.clientConfig.getProxyIPServiceURL());
         } catch (Exception e) {
-            logger.warn("try to request proxy list by http, caught {}", e.getMessage());
+            LOGGER.warn("try to request proxy list by http, caught {}", e.getMessage());
         }
         return null;
     }
@@ -271,7 +277,7 @@ public class ProxyConfigManager extends Thread {
             }
             /* We should exit if no local IP list and can't request it from manager.*/
             if (localMd5 == null && proxyEntry == null) {
-                logger.error("Can't connect manager at the start of proxy API {}",
+                LOGGER.error("Can't connect manager at the start of proxy API {}",
                         this.clientConfig.getProxyIPServiceURL());
                 proxyEntry = tryToReadCacheProxyEntry(configAddr);
             }
@@ -281,7 +287,7 @@ public class ProxyConfigManager extends Thread {
                     s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
                             .append(",");
                 }
-                logger.warn("Backup proxyEntry [{}]", s);
+                LOGGER.warn("Backup proxyEntry [{}]", s);
             }
         }
         if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
@@ -302,12 +308,10 @@ public class ProxyConfigManager extends Thread {
      */
     private void compareProxyList(ProxyConfigEntry proxyEntry) {
         if (proxyEntry != null) {
-            logger.info("{}", proxyEntry.toString());
+            LOGGER.info("{}", proxyEntry.toString());
             if (proxyEntry.getSize() != 0) {
                 /* Initialize the current proxy information list first. */
                 clientManager.setLoadThreshold(proxyEntry.getLoad());
-                clientManager.setGroupIdNum(proxyEntry.getGroupIdNum());
-                clientManager.setStreamIdMap(proxyEntry.getStreamIdNumMap());
 
                 List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
                 for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) {
@@ -318,7 +322,7 @@ public class ProxyConfigManager extends Thread {
                 String oldMd5 = calcHostInfoMd5(proxyInfoList);
                 if (newMd5 != null && !newMd5.equals(oldMd5)) {
                     /* Choose random alive connections to send messages. */
-                    logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
+                    LOGGER.info("old md5 {} new md5 {}", oldMd5, newMd5);
                     proxyInfoList.clear();
                     proxyInfoList = newProxyInfoList;
                     clientManager.setProxyInfoList(proxyInfoList);
@@ -327,19 +331,19 @@ public class ProxyConfigManager extends Thread {
                     /*judge  cluster's switch state*/
                     oldStat = proxyEntry.getSwitchStat();
                     if ((System.currentTimeMillis() - doworkTime) > 3 * 60 * 1000) {
-                        logger.info("switch the cluster!");
+                        LOGGER.info("switch the cluster!");
                         proxyInfoList.clear();
                         proxyInfoList = newProxyInfoList;
                         clientManager.setProxyInfoList(proxyInfoList);
                     } else {
-                        logger.info("only change oldStat ");
+                        LOGGER.info("only change oldStat ");
                     }
                 } else {
                     newProxyInfoList.clear();
-                    logger.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad());
+                    LOGGER.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad());
                 }
             } else {
-                logger.error("proxyEntry's size is zero");
+                LOGGER.error("proxyEntry's size is zero");
             }
         }
     }
@@ -410,7 +414,7 @@ public class ProxyConfigManager extends Thread {
 
     private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
         if (Utils.isBlank(userName)) {
-            logger.warn(" userName(" + userName + ") is not available");
+            LOGGER.warn(" userName(" + userName + ") is not available");
             return null;
         }
         EncryptConfigEntry entry;
@@ -430,7 +434,7 @@ public class ProxyConfigManager extends Thread {
                 return null;
             }
         } catch (Throwable e1) {
-            logger.error("Read " + userName + " stored PubKeyEntry error ", e1);
+            LOGGER.error("Read " + userName + " stored PubKeyEntry error ", e1);
             return null;
         } finally {
             if (fis != null) {
@@ -462,7 +466,7 @@ public class ProxyConfigManager extends Thread {
             p.flush();
             //p.close();
         } catch (Throwable e) {
-            logger.error("store EncryptConfigEntry " + entry.toString() + " exception ", e);
+            LOGGER.error("store EncryptConfigEntry " + entry.toString() + " exception ", e);
             e.printStackTrace();
         } finally {
             if (fos != null) {
@@ -497,29 +501,34 @@ public class ProxyConfigManager extends Thread {
 
     private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, boolean needGet) {
         if (Utils.isBlank(userName)) {
-            logger.error("Queried userName is null!");
+            LOGGER.error("Queried userName is null!");
             return null;
         }
         List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
         params.add(new BasicNameValuePair("operation", "query"));
         params.add(new BasicNameValuePair("username", userName));
-        JsonObject pubKeyConf = requestConfiguration(pubKeyUrl, params);
+        String returnStr = requestConfiguration(pubKeyUrl, params);
+        if (Utils.isBlank(returnStr)) {
+            LOGGER.info("No public key information returned from manager");
+            return null;
+        }
+        JsonObject pubKeyConf = jsonParser.parse(returnStr).getAsJsonObject();
         if (pubKeyConf == null) {
-            logger.info("No public key information returned from manager");
+            LOGGER.info("No public key information returned from manager");
             return null;
         }
         if (!pubKeyConf.has("resultCode")) {
-            logger.info("Parse pubKeyConf failure: No resultCode key information returned from manager");
+            LOGGER.info("Parse pubKeyConf failure: No resultCode key information returned from manager");
             return null;
         }
         int resultCode = pubKeyConf.get("resultCode").getAsInt();
         if (resultCode != 0) {
-            logger.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is "
+            LOGGER.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is "
                     + pubKeyConf.get("message").getAsString());
             return null;
         }
         if (!pubKeyConf.has("resultData")) {
-            logger.info("Parse pubKeyConf failure: No resultData key information returned from manager");
+            LOGGER.info("Parse pubKeyConf failure: No resultData key information returned from manager");
             return null;
         }
         JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
@@ -541,78 +550,20 @@ public class ProxyConfigManager extends Thread {
         return null;
     }
 
-    private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Exception {
-        JsonObject localProxyAddrJson;
+    public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Exception {
+        DataProxyNodeResponse proxyCluster;
         try {
             byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
-            localProxyAddrJson = jsonParser.parse(new String(fileBytes)).getAsJsonObject();
+            proxyCluster = gson.fromJson(new String(fileBytes), DataProxyNodeResponse.class);
         } catch (Throwable e) {
             throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
         }
-
-        int groupIdNum = 0;
-        if (localProxyAddrJson.has("bsn")) {
-            groupIdNum = localProxyAddrJson.get("bsn").getAsInt();
+        if (ObjectUtils.isEmpty(proxyCluster)) {
+            LOGGER.warn("no proxyCluster configure from local file");
+            return null;
         }
 
-        int load = ConfigConstants.LOAD_THRESHOLD;
-        if (localProxyAddrJson.has("load")) {
-            int inLoad = localProxyAddrJson.get("load").getAsInt();
-            load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
-        }
-        ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
-        proxyEntry.setGroupId(clientConfig.getGroupId());
-        boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
-        proxyEntry.setInterVisit(isInterVisit);
-        Map<String, HostInfo> hostMap = getHostInfoMap(
-                localProxyAddrJson);
-        proxyEntry.setHostMap(hostMap);
-        proxyEntry.setSwitchStat(0);
-        Map<String, Integer> streamIdMap = getStreamIdMap(localProxyAddrJson);
-        proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
-        proxyEntry.setLoad(load);
-        if (localProxyAddrJson.has("clusterId")) {
-            proxyEntry.setClusterId(localProxyAddrJson.get("clusterId").getAsString());
-        }
-        return proxyEntry;
-    }
-
-    private Map<String, HostInfo> getHostInfoMap(JsonObject localProxyAddrJson)
-            throws Exception {
-        Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
-        JsonArray jsonHostList = localProxyAddrJson.get("address").getAsJsonArray();
-        if (jsonHostList == null) {
-            throw new Exception("Parse local proxyList failure: address field is not exist!");
-        }
-        for (int i = 0; i < jsonHostList.size(); i++) {
-            JsonObject jsonItem = jsonHostList.get(i).getAsJsonObject();
-            if (jsonItem != null) {
-                if (!jsonItem.has("port")) {
-                    throw new Exception("Parse local proxyList failure: "
-                            + "port field is not exist in address(" + i + ")!");
-                }
-                int port = jsonItem.get("port").getAsInt();
-                if (port <= 0) {
-                    throw new Exception("Parse local proxyList failure: "
-                            + "port value <= 0 in address(" + i + ")!");
-                }
-                if (!jsonItem.has("host")) {
-                    throw new Exception("Parse local proxyList failure: "
-                            + "host field is not exist in address(" + i + ")!");
-                }
-                String hostItem = jsonItem.get("host").getAsString();
-                if (Utils.isBlank(hostItem)) {
-                    throw new Exception("Parse local proxyList failure: "
-                            + "host value is blank in address(" + i + ")!");
-                }
-                String refId = hostItem + ":" + String.valueOf(port);
-                hostMap.put(refId, new HostInfo(refId, hostItem, port));
-            }
-        }
-        if (hostMap.isEmpty()) {
-            throw new Exception("Parse local proxyList failure: address is empty !");
-        }
-        return hostMap;
+        return getProxyConfigEntry(proxyCluster);
     }
 
     private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) {
@@ -629,106 +580,76 @@ public class ProxyConfigManager extends Thread {
         return streamIdMap;
     }
 
-    private boolean checkValidProxy(String filePath, JsonObject localProxyAddrJson) throws Exception {
-        if (localProxyAddrJson == null) {
-            throw new Exception("Read local proxyList File failure by " + filePath + ", reason is content is null!");
-        }
-        if (!localProxyAddrJson.has("size")) {
-            throw new Exception("Parse local proxyList failure: size field is not exist!");
-        }
-        int size = localProxyAddrJson.get("size").getAsInt();
-        if (size == 0) {
-            throw new Exception("Parse local proxyList failure: proxy list size = 0!");
-        }
-        boolean isInterVisit = false;
-        if (localProxyAddrJson.has("isInterVisit")) {
-            isInterVisit = localProxyAddrJson.get("isInterVisit").getAsInt() != 0;
-        }
-        return isInterVisit;
-    }
-
     public ProxyConfigEntry requestProxyList(String url) {
         ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
         params.add(new BasicNameValuePair("extTag", clientConfig.getNetTag()));
         params.add(new BasicNameValuePair("ip", this.localIP));
-        logger.info("Begin to get configure from manager {}, param is {}", url, params);
+        LOGGER.info("Begin to get configure from manager {}, param is {}", url, params);
 
-        JsonObject jsonRes = requestConfiguration(url, params);
-        if (jsonRes == null) {
+        String resultStr = requestConfiguration(url, params);
+        ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, ProxyClusterConfig.class);
+        if (clusterConfig == null || !clusterConfig.isSuccess() || clusterConfig.getData() == null) {
             return null;
         }
 
-        Map<String, HostInfo> hostMap = formatHostInfoMap(
-                jsonRes);
+        DataProxyNodeResponse proxyCluster = clusterConfig.getData();
+        return getProxyConfigEntry(proxyCluster);
+    }
 
-        if (hostMap == null) {
+    private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse proxyCluster) {
+        List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
+        if (CollectionUtils.isEmpty(nodeList)) {
+            LOGGER.error("dataproxy nodeList is empty in DataProxyNodeResponse!");
             return null;
         }
-        int groupIdNum = 0;
-        if (jsonRes.has("bsn")) {
-            groupIdNum = jsonRes.get("bsn").getAsInt();
+        Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
+        if (MapUtils.isEmpty(hostMap)) {
+            return null;
+        }
+
+        int clusterId = -1;
+        if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
+            clusterId = proxyCluster.getClusterId();
         }
         int load = ConfigConstants.LOAD_THRESHOLD;
-        if (jsonRes.has("load")) {
-            int inLoad = jsonRes.get("load").getAsInt();
-            load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
+        if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
+            load = proxyCluster.getLoad() > 200 ? 200 : (Math.max(proxyCluster.getLoad(), 0));
+        }
+        boolean isIntranet = true;
+        if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
+            isIntranet = proxyCluster.getIsIntranet() == 1 ? true : false;
+        }
+        int isSwitch = 0;
+        if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
+            isSwitch = proxyCluster.getIsSwitch();
         }
+
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
+        proxyEntry.setClusterId(clusterId);
         proxyEntry.setGroupId(clientConfig.getGroupId());
-        proxyEntry.setInterVisit(true);
+        proxyEntry.setInterVisit(isIntranet);
         proxyEntry.setHostMap(hostMap);
-        proxyEntry.setSwitchStat(0);
-        Map<String, Integer> streamIdMap = getStreamIdMap(jsonRes);
-        proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
+        proxyEntry.setSwitchStat(isSwitch);
         proxyEntry.setLoad(load);
-        if (jsonRes.has("data")) {
-            JsonArray data = jsonRes.getAsJsonArray("data");
-            if (data != null) {
-                String id = data.get(0).getAsJsonObject().get("id").getAsString();
-                proxyEntry.setClusterId(id);
-            }
-        }
+        proxyEntry.setSize(nodeList.size());
         return proxyEntry;
     }
 
-    private Map<String, HostInfo> formatHostInfoMap(JsonObject jsonRes) {
-        Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
-        JsonArray jsonHostList = jsonRes.getAsJsonArray("data");
-        if (jsonHostList == null) {
-            logger.info("Parse proxyList failure: address field is not exist for response from manager!");
-            return null;
-        }
-        for (int i = 0; i < jsonHostList.size(); i++) {
-            JsonObject jsonItem = jsonHostList.get(i).getAsJsonObject();
-            if (jsonItem != null) {
-                if (!jsonItem.has("port")) {
-                    logger.error("Parse proxyList failure: port field is not exist in address("
-                            + i + ") for response from manager!");
-                    return null;
-                }
-                int port = jsonItem.get("port").getAsInt();
-                if (port <= 0) {
-                    logger.info("Parse proxyList failure: port value <= 0 in address("
-                            + i + ") for response from manager!");
-                    return null;
-                }
-                if (!jsonItem.has("ip")) {
-                    logger.error("Parse proxyList failure: host field is not exist in address("
-                            + i + ") for response from manager!");
-                    return null;
-                }
-                String hostItem = jsonItem.get("ip").getAsString();
-                if (Utils.isBlank(hostItem)) {
-                    logger.error("Parse proxyList failure: host value is blank in address("
-                            + i + ") for response from manager!");
-                    return null;
-                }
-                String refId = hostItem + ":" + String.valueOf(port);
-                hostMap.put(refId, new HostInfo(refId, hostItem, port));
+    private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo> nodeList) {
+        Map<String, HostInfo> hostMap = new HashMap<>();
+        for (DataProxyNodeInfo proxy : nodeList) {
+            if (ObjectUtils.isEmpty(proxy.getId()) || StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
+                    .isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
+                LOGGER.error("invalid proxy node, id:{}, ip:{}, port:{}", proxy.getId(), proxy.getIp(),
+                        proxy.getPort());
+                continue;
             }
+            String refId = proxy.getIp() + ":" + proxy.getPort();
+            hostMap.put(refId, new HostInfo(refId, proxy.getIp(), proxy.getPort()));
+
         }
         if (hostMap.isEmpty()) {
-            logger.error("Parse proxyList failure: address is empty for response from manager!");
+            LOGGER.error("Parse proxyList failure: address is empty for response from manager!");
             return null;
         }
         return hostMap;
@@ -768,9 +689,9 @@ public class ProxyConfigManager extends Thread {
     }
 
     /* Request new configurations from Manager. */
-    private JsonObject requestConfiguration(String url, List<BasicNameValuePair> params) {
+    private String requestConfiguration(String url, List<BasicNameValuePair> params) {
         if (Utils.isBlank(url)) {
-            logger.error("request url is null");
+            LOGGER.error("request url is null");
             return null;
         }
         // get local managerIpList
@@ -789,7 +710,7 @@ public class ProxyConfigManager extends Thread {
                 try {
                     httpClient = getCloseableHttpClient(params);
                 } catch (Throwable eHttps) {
-                    logger.error("Create Https cliet failure, error 1 is ", eHttps);
+                    LOGGER.error("Create Https cliet failure, error 1 is ", eHttps);
                     eHttps.printStackTrace();
                     return null;
                 }
@@ -805,7 +726,7 @@ public class ProxyConfigManager extends Thread {
             }
             tryIdx++;
 
-            logger.info("Request url : " + url + ", localManagerIps : " + localManagerIps);
+            LOGGER.info("Request url : " + url + ", localManagerIps : " + localManagerIps);
             try {
                 httpPost = new HttpPost(url);
                 if (this.clientConfig.isNeedAuthentication()) {
@@ -821,16 +742,15 @@ public class ProxyConfigManager extends Thread {
                 HttpResponse response = httpClient.execute(httpPost);
                 returnStr = EntityUtils.toString(response.getEntity());
                 if (Utils.isNotBlank(returnStr) && response.getStatusLine().getStatusCode() == 200) {
-                    logger.info("Get configure from manager is " + returnStr);
-                    JsonObject jsonRes = jsonParser.parse(returnStr).getAsJsonObject();
-                    return jsonRes;
+                    LOGGER.info("Get configure from manager is " + returnStr);
+                    return returnStr;
                 }
 
                 if (!clientConfig.isLocalVisit()) {
                     return null;
                 }
             } catch (Throwable e) {
-                logger.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url);
+                LOGGER.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url);
 
                 if (!clientConfig.isLocalVisit()) {
                     return null;
@@ -887,7 +807,7 @@ public class ProxyConfigManager extends Thread {
                 byte[] serialized;
                 serialized = FileUtils.readFileToByteArray(localManagerIpsFile);
                 if (serialized == null) {
-                    logger.error("Local managerIp file is empty, file path : "
+                    LOGGER.error("Local managerIp file is empty, file path : "
                             + clientConfig.getManagerIpLocalPath());
                     return null;
                 }
@@ -897,12 +817,12 @@ public class ProxyConfigManager extends Thread {
                     localManagerIpsFile.getParentFile().mkdirs();
                 }
                 localManagerIps = "";
-                logger.error("Get local managerIpList not exist, file path : "
+                LOGGER.error("Get local managerIpList not exist, file path : "
                         + clientConfig.getManagerIpLocalPath());
             }
         } catch (Throwable t) {
             localManagerIps = "";
-            logger.error("Get local managerIpList occur exception,", t);
+            LOGGER.error("Get local managerIpList occur exception,", t);
         }
         return localManagerIps;
     }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index de6920993..5f2c72ebc 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -65,7 +65,7 @@ public class Sender {
     private final ProxyClientConfig configure;
     private final boolean isFile;
     private final MetricWorkerThread metricWorker;
-    private String clusterId;
+    private int clusterId = -1;
 
     public Sender(ProxyClientConfig configure) throws Exception {
         this(configure, null);
@@ -696,11 +696,11 @@ public class Sender {
         callbacks.clear();
     }
 
-    public String getClusterId() {
+    public int getClusterId() {
         return clusterId;
     }
 
-    public void setClusterId(String clusterId) {
+    public void setClusterId(int clusterId) {
         this.clusterId = clusterId;
     }
 
diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
new file mode 100644
index 000000000..d6817eb7a
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.junit.Assert;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class ProxyConfigManagerTest {
+
+    private final String localFile = Paths.get(
+            Objects.requireNonNull(this.getClass().getClassLoader().getResource("proxylist.json")).toURI())
+            .toString();
+    private final ProxyClientConfig clientConfig = PowerMockito.mock(ProxyClientConfig.class);
+    private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
+    private final ProxyConfigManager proxyConfigManager = new ProxyConfigManager(clientConfig, "127.0.0.1",
+            clientMgr);
+
+    public ProxyConfigManagerTest() throws URISyntaxException {
+    }
+
+    @Test
+    public void testProxyConfigParse() throws Exception {
+        ProxyConfigEntry proxyEntry = proxyConfigManager.getLocalProxyListFromFile(localFile);
+        Assert.assertEquals(proxyEntry.isInterVisit(), false);
+        Assert.assertEquals(proxyEntry.getLoad(), 12);
+        Assert.assertEquals(proxyEntry.getClusterId(), 1);
+        Assert.assertEquals(proxyEntry.getSize(), 2);
+        Assert.assertEquals(proxyEntry.getHostMap().containsKey("127.0.0.1:46801"), true);
+        Assert.assertEquals(proxyEntry.getHostMap().containsKey("127.0.0.1:8080"), false);
+    }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json b/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json
new file mode 100644
index 000000000..ffcc6efa6
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json
@@ -0,0 +1,18 @@
+{
+  "clusterId": 1,
+  "load": 12,
+  "isIntranet": 0,
+  "isSwitch": 1,
+  "nodeList": [
+    {
+      "id": 1,
+      "ip": "127.0.0.1",
+      "port": 46801
+    },
+    {
+      "id": 2,
+      "ip": "127.0.0.2",
+      "port": 46801
+    }
+  ]
+}
\ No newline at end of file