You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/24 13:15:35 UTC

[incubator-inlong] branch master updated: [INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 86313d3ca [INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)
86313d3ca is described below

commit 86313d3ca1d71ec190278d469ac723762fff8306
Author: 卢春亮 <94...@qq.com>
AuthorDate: Tue May 24 21:15:29 2022 +0800

    [INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)
    
    * Get DataProxy configuration data from inlong_cluster and inlong_group.
    
    * Move resultType to manager-common
    
    * Remove unusable table
---
 .../common/pojo/dataproxy}/CacheCluster.java       |  79 +++-
 .../common/pojo/dataproxy/InlongGroupId.java}      |  85 ++--
 .../common/pojo/dataproxy/InlongStreamId.java}     |  85 ++--
 .../common/pojo/dataproxy/ProxyCluster.java}       |  64 ++-
 .../inlong/manager/dao/entity/CacheTopic.java      |  82 ----
 .../inlong/manager/dao/entity/ClusterSet.java      | 215 ---------
 .../inlong/manager/dao/entity/FlumeChannel.java    |  82 ----
 .../inlong/manager/dao/entity/FlumeChannelExt.java | 120 -----
 .../inlong/manager/dao/entity/FlumeSink.java       | 101 -----
 .../inlong/manager/dao/entity/FlumeSinkExt.java    | 120 -----
 .../inlong/manager/dao/entity/FlumeSource.java     | 120 -----
 .../inlong/manager/dao/entity/FlumeSourceExt.java  | 120 -----
 .../inlong/manager/dao/entity/ProxyCluster.java    |  82 ----
 .../manager/dao/mapper/ClusterSetMapper.java       |  45 +-
 .../main/resources/mappers/ClusterSetMapper.xml    | 103 ++---
 .../service/core/DataProxyClusterService.java      |   2 +-
 .../core/impl/DataProxyClusterServiceImpl.java     |  14 +-
 .../repository/DataProxyConfigRepository.java      | 501 ++++++++-------------
 .../manager-web/sql/apache_inlong_manager.sql      | 203 ---------
 .../controller/openapi/DataProxyController.java    |  10 +-
 .../src/main/resources/application.properties      |   1 +
 .../sort/standalone/dispatch/DispatchManager.java  |  43 +-
 22 files changed, 430 insertions(+), 1847 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
similarity index 51%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
index 9aec0c6e0..c8281ae1e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
 
 /**
  * CacheCluster
  */
 public class CacheCluster {
+
     private String clusterName;
-    private String setName;
-    private String zone;
+    private String type;
+    private String clusterTag;
+    private String extTag;
+    private String extParams;
 
     /**
      * get clusterName
-     * 
      * @return the clusterName
      */
     public String getClusterName() {
@@ -36,7 +38,6 @@ public class CacheCluster {
 
     /**
      * set clusterName
-     * 
      * @param clusterName the clusterName to set
      */
     public void setClusterName(String clusterName) {
@@ -44,39 +45,67 @@ public class CacheCluster {
     }
 
     /**
-     * get setName
-     * 
-     * @return the setName
+     * get type
+     * @return the type
+     */
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * set type
+     * @param type the type to set
+     */
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * get clusterTag
+     * @return the clusterTag
+     */
+    public String getClusterTag() {
+        return clusterTag;
+    }
+
+    /**
+     * set clusterTag
+     * @param clusterTag the clusterTag to set
+     */
+    public void setClusterTag(String clusterTag) {
+        this.clusterTag = clusterTag;
+    }
+
+    /**
+     * get extTag
+     * @return the extTag
      */
-    public String getSetName() {
-        return setName;
+    public String getExtTag() {
+        return extTag;
     }
 
     /**
-     * set setName
-     * 
-     * @param setName the setName to set
+     * set extTag
+     * @param extTag the extTag to set
      */
-    public void setSetName(String setName) {
-        this.setName = setName;
+    public void setExtTag(String extTag) {
+        this.extTag = extTag;
     }
 
     /**
-     * get zone
-     * 
-     * @return the zone
+     * get extParams
+     * @return the extParams
      */
-    public String getZone() {
-        return zone;
+    public String getExtParams() {
+        return extParams;
     }
 
     /**
-     * set zone
-     * 
-     * @param zone the zone to set
+     * set extParams
+     * @param extParams the extParams to set
      */
-    public void setZone(String zone) {
-        this.zone = zone;
+    public void setExtParams(String extParams) {
+        this.extParams = extParams;
     }
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
similarity index 54%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
index 0de815720..c50c97b74 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
@@ -15,87 +15,80 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
 
 /**
- * InLongId
+ * InlongGroupId
  */
-public class InLongId {
-    private String inlongId;
+public class InlongGroupId {
+
+    private String inlongGroupId;
+    private String clusterTag;
     private String topic;
-    private String params;
-    private String setName;
+    private String extParams;
 
     /**
-     * get inlongId
-     * 
-     * @return the inlongId
+     * get inlongGroupId
+     * @return the inlongGroupId
      */
-    public String getInlongId() {
-        return inlongId;
+    public String getInlongGroupId() {
+        return inlongGroupId;
     }
 
     /**
-     * set inlongId
-     * 
-     * @param inlongId the inlongId to set
+     * set inlongGroupId
+     * @param inlongGroupId the inlongGroupId to set
      */
-    public void setInlongId(String inlongId) {
-        this.inlongId = inlongId;
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
     }
 
     /**
-     * get topic
-     * 
-     * @return the topic
+     * get clusterTag
+     * @return the clusterTag
      */
-    public String getTopic() {
-        return topic;
+    public String getClusterTag() {
+        return clusterTag;
     }
 
     /**
-     * set topic
-     * 
-     * @param topic the topic to set
+     * set clusterTag
+     * @param clusterTag the clusterTag to set
      */
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public void setClusterTag(String clusterTag) {
+        this.clusterTag = clusterTag;
     }
 
     /**
-     * get params
-     * 
-     * @return the params
+     * get topic
+     * @return the topic
      */
-    public String getParams() {
-        return params;
+    public String getTopic() {
+        return topic;
     }
 
     /**
-     * set params
-     * 
-     * @param params the params to set
+     * set topic
+     * @param topic the topic to set
      */
-    public void setParams(String params) {
-        this.params = params;
+    public void setTopic(String topic) {
+        this.topic = topic;
     }
 
     /**
-     * get setName
-     * 
-     * @return the setName
+     * get extParams
+     * @return the extParams
      */
-    public String getSetName() {
-        return setName;
+    public String getExtParams() {
+        return extParams;
     }
 
     /**
-     * set setName
-     * 
-     * @param setName the setName to set
+     * set extParams
+     * @param extParams the extParams to set
      */
-    public void setSetName(String setName) {
-        this.setName = setName;
+    public void setExtParams(String extParams) {
+        this.extParams = extParams;
     }
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
similarity index 53%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
index 0de815720..3ac54690f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
@@ -15,87 +15,80 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
 
 /**
- * InLongId
+ * InlongStreamId
  */
-public class InLongId {
-    private String inlongId;
+public class InlongStreamId {
+
+    private String inlongGroupId;
+    private String inlongStreamId;
     private String topic;
-    private String params;
-    private String setName;
+    private String extParams;
 
     /**
-     * get inlongId
-     * 
-     * @return the inlongId
+     * get inlongGroupId
+     * @return the inlongGroupId
      */
-    public String getInlongId() {
-        return inlongId;
+    public String getInlongGroupId() {
+        return inlongGroupId;
     }
 
     /**
-     * set inlongId
-     * 
-     * @param inlongId the inlongId to set
+     * set inlongGroupId
+     * @param inlongGroupId the inlongGroupId to set
      */
-    public void setInlongId(String inlongId) {
-        this.inlongId = inlongId;
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
     }
 
     /**
-     * get topic
-     * 
-     * @return the topic
+     * get inlongStreamId
+     * @return the inlongStreamId
      */
-    public String getTopic() {
-        return topic;
+    public String getInlongStreamId() {
+        return inlongStreamId;
     }
 
     /**
-     * set topic
-     * 
-     * @param topic the topic to set
+     * set inlongStreamId
+     * @param inlongStreamId the inlongStreamId to set
      */
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
     }
 
     /**
-     * get params
-     * 
-     * @return the params
+     * get topic
+     * @return the topic
      */
-    public String getParams() {
-        return params;
+    public String getTopic() {
+        return topic;
     }
 
     /**
-     * set params
-     * 
-     * @param params the params to set
+     * set topic
+     * @param topic the topic to set
      */
-    public void setParams(String params) {
-        this.params = params;
+    public void setTopic(String topic) {
+        this.topic = topic;
     }
 
     /**
-     * get setName
-     * 
-     * @return the setName
+     * get extParams
+     * @return the extParams
      */
-    public String getSetName() {
-        return setName;
+    public String getExtParams() {
+        return extParams;
     }
 
     /**
-     * set setName
-     * 
-     * @param setName the setName to set
+     * set extParams
+     * @param extParams the extParams to set
      */
-    public void setSetName(String setName) {
-        this.setName = setName;
+    public void setExtParams(String extParams) {
+        this.extParams = extParams;
     }
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
similarity index 57%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
index 3f728bd79..5bdc069d9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
 
 /**
- * CacheClusterExt
+ * ProxyCluster
  */
-public class CacheClusterExt {
+public class ProxyCluster {
 
     private String clusterName;
-    private String keyName;
-    private String keyValue;
-    private Integer isDeleted;
+    private String clusterTag;
+    private String extTag;
+    private String extParams;
 
     /**
      * get clusterName
-     * 
      * @return the clusterName
      */
     public String getClusterName() {
@@ -38,7 +37,6 @@ public class CacheClusterExt {
 
     /**
      * set clusterName
-     * 
      * @param clusterName the clusterName to set
      */
     public void setClusterName(String clusterName) {
@@ -46,53 +44,51 @@ public class CacheClusterExt {
     }
 
     /**
-     * get keyName
-     * 
-     * @return the keyName
+     * get clusterTag
+     * @return the clusterTag
      */
-    public String getKeyName() {
-        return keyName;
+    public String getClusterTag() {
+        return clusterTag;
     }
 
     /**
-     * set keyName
-     * 
-     * @param keyName the keyName to set
+     * set clusterTag
+     * @param clusterTag the clusterTag to set
      */
-    public void setKeyName(String keyName) {
-        this.keyName = keyName;
+    public void setClusterTag(String clusterTag) {
+        this.clusterTag = clusterTag;
     }
 
     /**
-     * get keyValue
-     * 
-     * @return the keyValue
+     * get extTag
+     * @return the extTag
      */
-    public String getKeyValue() {
-        return keyValue;
+    public String getExtTag() {
+        return extTag;
     }
 
     /**
-     * set keyValue
-     * 
-     * @param keyValue the keyValue to set
+     * set extTag
+     * @param extTag the extTag to set
      */
-    public void setKeyValue(String keyValue) {
-        this.keyValue = keyValue;
+    public void setExtTag(String extTag) {
+        this.extTag = extTag;
     }
 
     /**
-     * getJIsDeleted
+     * get extParams
+     * @return the extParams
      */
-    public Integer getJIsDeleted() {
-        return isDeleted;
+    public String getExtParams() {
+        return extParams;
     }
 
     /**
-     * setIsDeleted
+     * set extParams
+     * @param extParams the extParams to set
      */
-    public void setIsDeleted(Integer isDeleted) {
-        this.isDeleted = isDeleted;
+    public void setExtParams(String extParams) {
+        this.extParams = extParams;
     }
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java
deleted file mode 100644
index 26b5bd1ef..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * CacheTopic
- */
-public class CacheTopic {
-    private String topicName;
-    private String setName;
-    private Integer partitionNum;
-
-    /**
-     * get topicName
-     * 
-     * @return the topicName
-     */
-    public String getTopicName() {
-        return topicName;
-    }
-
-    /**
-     * set topicName
-     * 
-     * @param topicName the topicName to set
-     */
-    public void setTopicName(String topicName) {
-        this.topicName = topicName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get partitionNum
-     * 
-     * @return the partitionNum
-     */
-    public Integer getPartitionNum() {
-        return partitionNum;
-    }
-
-    /**
-     * set partitionNum
-     * 
-     * @param partitionNum the partitionNum to set
-     */
-    public void setPartitionNum(Integer partitionNum) {
-        this.partitionNum = partitionNum;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java
deleted file mode 100644
index 9344ebfd6..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * ClusterSet
- */
-public class ClusterSet {
-    private String setName;
-    private String cnName;
-    private String description;
-    private String middlewareType;
-    private String inCharges;
-    private String followers;
-    private Integer status;
-    private Integer isDeleted;
-    private String creator;
-    private String modifier;
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get cnName
-     * 
-     * @return the cnName
-     */
-    public String getCnName() {
-        return cnName;
-    }
-
-    /**
-     * set cnName
-     * 
-     * @param cnName the cnName to set
-     */
-    public void setCnName(String cnName) {
-        this.cnName = cnName;
-    }
-
-    /**
-     * get description
-     * 
-     * @return the description
-     */
-    public String getDescription() {
-        return description;
-    }
-
-    /**
-     * set description
-     * 
-     * @param description the description to set
-     */
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    /**
-     * get middlewareType
-     * 
-     * @return the middlewareType
-     */
-    public String getMiddlewareType() {
-        return middlewareType;
-    }
-
-    /**
-     * set middlewareType
-     * 
-     * @param middlewareType the middlewareType to set
-     */
-    public void setMiddlewareType(String middlewareType) {
-        this.middlewareType = middlewareType;
-    }
-
-    /**
-     * get inCharges
-     * 
-     * @return the inCharges
-     */
-    public String getInCharges() {
-        return inCharges;
-    }
-
-    /**
-     * set inCharges
-     * 
-     * @param inCharges the inCharges to set
-     */
-    public void setInCharges(String inCharges) {
-        this.inCharges = inCharges;
-    }
-
-    /**
-     * get followers
-     * 
-     * @return the followers
-     */
-    public String getFollowers() {
-        return followers;
-    }
-
-    /**
-     * set followers
-     * 
-     * @param followers the followers to set
-     */
-    public void setFollowers(String followers) {
-        this.followers = followers;
-    }
-
-    /**
-     * get status
-     * 
-     * @return the status
-     */
-    public Integer getStatus() {
-        return status;
-    }
-
-    /**
-     * set status
-     * 
-     * @param status the status to set
-     */
-    public void setStatus(Integer status) {
-        this.status = status;
-    }
-
-    /**
-     * get isDeleted
-     * 
-     * @return the isDeleted
-     */
-    public Integer getIsDeleted() {
-        return isDeleted;
-    }
-
-    /**
-     * set isDeleted
-     * 
-     * @param isDeleted the isDeleted to set
-     */
-    public void setIsDeleted(Integer isDeleted) {
-        this.isDeleted = isDeleted;
-    }
-
-    /**
-     * get creator
-     * 
-     * @return the creator
-     */
-    public String getCreator() {
-        return creator;
-    }
-
-    /**
-     * set creator
-     * 
-     * @param creator the creator to set
-     */
-    public void setCreator(String creator) {
-        this.creator = creator;
-    }
-
-    /**
-     * get modifier
-     * 
-     * @return the modifier
-     */
-    public String getModifier() {
-        return modifier;
-    }
-
-    /**
-     * set modifier
-     * 
-     * @param modifier the modifier to set
-     */
-    public void setModifier(String modifier) {
-        this.modifier = modifier;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java
deleted file mode 100644
index 1b19ff816..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeChannel
- */
-public class FlumeChannel {
-    private String channelName;
-    private String setName;
-    private String type;
-
-    /**
-     * get channelName
-     * 
-     * @return the channelName
-     */
-    public String getChannelName() {
-        return channelName;
-    }
-
-    /**
-     * set channelName
-     * 
-     * @param channelName the channelName to set
-     */
-    public void setChannelName(String channelName) {
-        this.channelName = channelName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get type
-     * 
-     * @return the type
-     */
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * set type
-     * 
-     * @param type the type to set
-     */
-    public void setType(String type) {
-        this.type = type;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java
deleted file mode 100644
index 4fa5d7d11..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeChannelExt
- */
-public class FlumeChannelExt {
-    private String parentName;
-    private String setName;
-    private String keyName;
-    private String keyValue;
-    private Integer isDeleted;
-
-    /**
-     * get parentName
-     * 
-     * @return the parentName
-     */
-    public String getParentName() {
-        return parentName;
-    }
-
-    /**
-     * set parentName
-     * 
-     * @param parentName the parentName to set
-     */
-    public void setParentName(String parentName) {
-        this.parentName = parentName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get keyName
-     * 
-     * @return the keyName
-     */
-    public String getKeyName() {
-        return keyName;
-    }
-
-    /**
-     * set keyName
-     * 
-     * @param keyName the keyName to set
-     */
-    public void setKeyName(String keyName) {
-        this.keyName = keyName;
-    }
-
-    /**
-     * get keyValue
-     * 
-     * @return the keyValue
-     */
-    public String getKeyValue() {
-        return keyValue;
-    }
-
-    /**
-     * set keyValue
-     * 
-     * @param keyValue the keyValue to set
-     */
-    public void setKeyValue(String keyValue) {
-        this.keyValue = keyValue;
-    }
-
-    /**
-     * get isDeleted
-     * 
-     * @return the isDeleted
-     */
-    public Integer getIsDeleted() {
-        return isDeleted;
-    }
-
-    /**
-     * set isDeleted
-     * 
-     * @param isDeleted the isDeleted to set
-     */
-    public void setIsDeleted(Integer isDeleted) {
-        this.isDeleted = isDeleted;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java
deleted file mode 100644
index 54c536979..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSink
- */
-public class FlumeSink {
-    private String sinkName;
-    private String setName;
-    private String type;
-    private String channel;
-
-    /**
-     * get sinkName
-     * 
-     * @return the sinkName
-     */
-    public String getSinkName() {
-        return sinkName;
-    }
-
-    /**
-     * set sinkName
-     * 
-     * @param sinkName the sinkName to set
-     */
-    public void setSinkName(String sinkName) {
-        this.sinkName = sinkName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get type
-     * 
-     * @return the type
-     */
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * set type
-     * 
-     * @param type the type to set
-     */
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    /**
-     * get channel
-     * 
-     * @return the channel
-     */
-    public String getChannel() {
-        return channel;
-    }
-
-    /**
-     * set channel
-     * 
-     * @param channel the channel to set
-     */
-    public void setChannel(String channel) {
-        this.channel = channel;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java
deleted file mode 100644
index 8022da8db..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSinkExt
- */
-public class FlumeSinkExt {
-    private String parentName;
-    private String setName;
-    private String keyName;
-    private String keyValue;
-    private Integer isDeleted;
-
-    /**
-     * get parentName
-     * 
-     * @return the parentName
-     */
-    public String getParentName() {
-        return parentName;
-    }
-
-    /**
-     * set parentName
-     * 
-     * @param parentName the parentName to set
-     */
-    public void setParentName(String parentName) {
-        this.parentName = parentName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get keyName
-     * 
-     * @return the keyName
-     */
-    public String getKeyName() {
-        return keyName;
-    }
-
-    /**
-     * set keyName
-     * 
-     * @param keyName the keyName to set
-     */
-    public void setKeyName(String keyName) {
-        this.keyName = keyName;
-    }
-
-    /**
-     * get keyValue
-     * 
-     * @return the keyValue
-     */
-    public String getKeyValue() {
-        return keyValue;
-    }
-
-    /**
-     * set keyValue
-     * 
-     * @param keyValue the keyValue to set
-     */
-    public void setKeyValue(String keyValue) {
-        this.keyValue = keyValue;
-    }
-
-    /**
-     * get isDeleted
-     * 
-     * @return the isDeleted
-     */
-    public Integer getIsDeleted() {
-        return isDeleted;
-    }
-
-    /**
-     * set isDeleted
-     * 
-     * @param isDeleted the isDeleted to set
-     */
-    public void setIsDeleted(Integer isDeleted) {
-        this.isDeleted = isDeleted;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java
deleted file mode 100644
index 0afb4a798..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSource
- */
-public class FlumeSource {
-    private String sourceName;
-    private String setName;
-    private String type;
-    private String channels;
-    private String selectorType;
-
-    /**
-     * get sourceName
-     * 
-     * @return the sourceName
-     */
-    public String getSourceName() {
-        return sourceName;
-    }
-
-    /**
-     * set sourceName
-     * 
-     * @param sourceName the sourceName to set
-     */
-    public void setSourceName(String sourceName) {
-        this.sourceName = sourceName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get type
-     * 
-     * @return the type
-     */
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * set type
-     * 
-     * @param type the type to set
-     */
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    /**
-     * get channels
-     * 
-     * @return the channels
-     */
-    public String getChannels() {
-        return channels;
-    }
-
-    /**
-     * set channels
-     * 
-     * @param channels the channels to set
-     */
-    public void setChannels(String channels) {
-        this.channels = channels;
-    }
-
-    /**
-     * get selectorType
-     * 
-     * @return the selectorType
-     */
-    public String getSelectorType() {
-        return selectorType;
-    }
-
-    /**
-     * set selectorType
-     * 
-     * @param selectorType the selectorType to set
-     */
-    public void setSelectorType(String selectorType) {
-        this.selectorType = selectorType;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java
deleted file mode 100644
index 5891e00f6..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSourceExt
- */
-public class FlumeSourceExt {
-    private String parentName;
-    private String setName;
-    private String keyName;
-    private String keyValue;
-    private Integer isDeleted;
-
-    /**
-     * get parentName
-     * 
-     * @return the parentName
-     */
-    public String getParentName() {
-        return parentName;
-    }
-
-    /**
-     * set parentName
-     * 
-     * @param parentName the parentName to set
-     */
-    public void setParentName(String parentName) {
-        this.parentName = parentName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get keyName
-     * 
-     * @return the keyName
-     */
-    public String getKeyName() {
-        return keyName;
-    }
-
-    /**
-     * set keyName
-     * 
-     * @param keyName the keyName to set
-     */
-    public void setKeyName(String keyName) {
-        this.keyName = keyName;
-    }
-
-    /**
-     * get keyValue
-     * 
-     * @return the keyValue
-     */
-    public String getKeyValue() {
-        return keyValue;
-    }
-
-    /**
-     * set keyValue
-     * 
-     * @param keyValue the keyValue to set
-     */
-    public void setKeyValue(String keyValue) {
-        this.keyValue = keyValue;
-    }
-
-    /**
-     * get isDeleted
-     * 
-     * @return the isDeleted
-     */
-    public Integer getIsDeleted() {
-        return isDeleted;
-    }
-
-    /**
-     * set isDeleted
-     * 
-     * @param isDeleted the isDeleted to set
-     */
-    public void setIsDeleted(Integer isDeleted) {
-        this.isDeleted = isDeleted;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java
deleted file mode 100644
index f1c0660ab..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * ProxyCluster
- */
-public class ProxyCluster {
-    private String clusterName;
-    private String setName;
-    private String zone;
-
-    /**
-     * get clusterName
-     * 
-     * @return the clusterName
-     */
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    /**
-     * set clusterName
-     * 
-     * @param clusterName the clusterName to set
-     */
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    /**
-     * get setName
-     * 
-     * @return the setName
-     */
-    public String getSetName() {
-        return setName;
-    }
-
-    /**
-     * set setName
-     * 
-     * @param setName the setName to set
-     */
-    public void setSetName(String setName) {
-        this.setName = setName;
-    }
-
-    /**
-     * get zone
-     * 
-     * @return the zone
-     */
-    public String getZone() {
-        return zone;
-    }
-
-    /**
-     * set zone
-     * 
-     * @param zone the zone to set
-     */
-    public void setZone(String zone) {
-        this.zone = zone;
-    }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
index 74ec7153a..090affdc6 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
@@ -17,52 +17,25 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
-
-import org.apache.inlong.manager.dao.entity.CacheCluster;
-import org.apache.inlong.manager.dao.entity.CacheClusterExt;
-import org.apache.inlong.manager.dao.entity.CacheTopic;
-import org.apache.inlong.manager.dao.entity.ClusterSet;
-import org.apache.inlong.manager.dao.entity.FlumeChannel;
-import org.apache.inlong.manager.dao.entity.FlumeChannelExt;
-import org.apache.inlong.manager.dao.entity.FlumeSink;
-import org.apache.inlong.manager.dao.entity.FlumeSinkExt;
-import org.apache.inlong.manager.dao.entity.FlumeSource;
-import org.apache.inlong.manager.dao.entity.FlumeSourceExt;
-import org.apache.inlong.manager.dao.entity.InLongId;
-import org.apache.inlong.manager.dao.entity.ProxyCluster;
-import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
+
 /**
  * ClusterSetMapper
  */
 @Repository
 public interface ClusterSetMapper {
-    List<ClusterSet> selectClusterSet();
-
-    List<InLongId> selectInlongId();
-
-    List<CacheCluster> selectCacheCluster();
-
-    List<CacheClusterExt> selectCacheClusterExt();
-
-    List<CacheTopic> selectCacheTopic();
 
     List<ProxyCluster> selectProxyCluster();
 
-    List<ProxyClusterToCacheCluster> selectProxyClusterToCacheCluster();
-
-    List<FlumeSource> selectFlumeSource();
-
-    List<FlumeSourceExt> selectFlumeSourceExt();
-
-    List<FlumeChannel> selectFlumeChannel();
-
-    List<FlumeChannelExt> selectFlumeChannelExt();
-
-    List<FlumeSink> selectFlumeSink();
+    List<CacheCluster> selectCacheCluster();
 
-    List<FlumeSinkExt> selectFlumeSinkExt();
+    List<InlongGroupId> selectInlongGroupId();
 
+    List<InlongStreamId> selectInlongStreamId();
 }
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 2eb1b176e..a52f7dadc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -19,79 +19,40 @@
 -->
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
-    <select id="selectClusterSet" resultType="org.apache.inlong.manager.dao.entity.ClusterSet">
-        select set_name,
-               cn_name,
-               description,
-               middleware_type,
-               in_charges,
-               followers,
-               status,
-               is_deleted,
-               creator,
-               modifier
-        from cluster_set
+<mapper
+    namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
+    <select id="selectProxyCluster"
+        resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
+        select name as cluster_name, cluster_tag, ext_tag, ext_params
+        from inlong_cluster
+        where type='DATA_PROXY'
+        and is_deleted='0'
+    </select>
+    <select id="selectCacheCluster"
+        resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
+        select name as cluster_name, type, cluster_tag, ext_tag,
+        ext_params
+        from inlong_cluster
+        where type in ('PULSAR','KAFKA','TUBE')
+        and is_deleted='0'
+    </select>
+    <select id="selectInlongGroupId"
+        resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
+        select inlong_group_id,
+        inlong_cluster_tag as
+        cluster_tag,
+        mq_resource as topic,
+        ext_params
+        from inlong_group
         where is_deleted = 0
     </select>
-    <select id="selectInlongId" resultType="org.apache.inlong.manager.dao.entity.InLongId">
-        select stream.inlong_stream_id                          as inlong_id,
-               stream.mq_resource                               as topic,
-               concat('fieldDelimiter=', stream.data_separator) as params,
-               c.set_name                                       as set_name
-        from inlong_stream stream,
-             cluster_set_inlongid c
-        where stream.is_deleted = 0
-          and stream.inlong_group_id = c.inlong_group_id
-    </select>
-    <select id="selectCacheCluster" resultType="org.apache.inlong.manager.dao.entity.CacheCluster">
-        select cluster_name, set_name, zone
-        from cache_cluster
-    </select>
-    <select id="selectCacheClusterExt" resultType="org.apache.inlong.manager.dao.entity.CacheClusterExt">
-        select cluster_name, key_name, key_value, is_deleted
-        from cache_cluster_ext
-        where is_deleted = 0
-    </select>
-
-    <select id="selectCacheTopic" resultType="org.apache.inlong.manager.dao.entity.CacheTopic">
-        select topic_name, set_name, partition_num
-        from cache_topic
-    </select>
-    <select id="selectProxyCluster" resultType="org.apache.inlong.manager.dao.entity.ProxyCluster">
-        select cluster_name, set_name, zone
-        from proxy_cluster
-    </select>
-    <select id="selectProxyClusterToCacheCluster"
-            resultType="org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster">
-        select proxy_cluster_name, cache_cluster_name
-        from proxy_cluster_to_cache_cluster
-    </select>
-    <select id="selectFlumeSource" resultType="org.apache.inlong.manager.dao.entity.FlumeSource">
-        select source_name, set_name, type, channels, selector_type
-        from flume_source
-    </select>
-    <select id="selectFlumeSourceExt" resultType="org.apache.inlong.manager.dao.entity.FlumeSourceExt">
-        select parent_name, set_name, key_name, key_value, is_deleted
-        from flume_source_ext
-        where is_deleted = 0
-    </select>
-    <select id="selectFlumeChannel" resultType="org.apache.inlong.manager.dao.entity.FlumeChannel">
-        select channel_name, set_name, type
-        from flume_channel
-    </select>
-    <select id="selectFlumeChannelExt" resultType="org.apache.inlong.manager.dao.entity.FlumeChannelExt">
-        select parent_name, set_name, key_name, key_value, is_deleted
-        from flume_channel_ext
-        where is_deleted = 0
-    </select>
-    <select id="selectFlumeSink" resultType="org.apache.inlong.manager.dao.entity.FlumeSink">
-        select sink_name, set_name, type, channel
-        from flume_sink
-    </select>
-    <select id="selectFlumeSinkExt" resultType="org.apache.inlong.manager.dao.entity.FlumeSinkExt">
-        select parent_name, set_name, key_name, key_value, is_deleted
-        from flume_sink_ext
+    <select id="selectInlongStreamId"
+        resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
+        select inlong_group_id,
+        inlong_stream_id,
+        mq_resource as topic,
+        ext_params
+        from inlong_stream
         where is_deleted = 0
     </select>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
index c235ff343..edb74c764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
@@ -27,6 +27,6 @@ public interface DataProxyClusterService {
      *
      * @return data proxy config
      */
-    String getAllConfig(String clusterName, String setName, String md5);
+    String getAllConfig(String clusterName, String md5);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 69b0147d4..bd7d801a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -18,16 +18,16 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet;
 import org.apache.inlong.manager.service.core.DataProxyClusterService;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * DataProxy cluster service layer implementation class
  */
@@ -43,12 +43,8 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
      *
      * @return data proxy config
      */
-    public String getAllConfig(String clusterName, String setName, String md5) {
-        DataProxyClusterSet setObj = proxyRepository.getDataProxyClusterSet(setName);
-        if (setObj == null) {
-            return this.getErrorAllConfig();
-        }
-        String configMd5 = setObj.getMd5Map().get(clusterName);
+    public String getAllConfig(String clusterName, String md5) {
+        String configMd5 = proxyRepository.getProxyMd5(clusterName);
         if (configMd5 == null) {
             return this.getErrorAllConfig();
         }
@@ -62,7 +58,7 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
             Gson gson = new Gson();
             return gson.toJson(response);
         }
-        String configJson = setObj.getProxyConfigJson().get(clusterName);
+        String configJson = proxyRepository.getProxyConfigJson(clusterName);
         if (configJson == null) {
             return this.getErrorAllConfig();
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 41299c362..60c4da500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,33 +19,21 @@ package org.apache.inlong.manager.service.repository;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
+
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
-import org.apache.inlong.common.pojo.dataproxy.CacheTopicObject;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
 import org.apache.inlong.common.pojo.dataproxy.IRepository;
 import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxyChannel;
 import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxySink;
-import org.apache.inlong.common.pojo.dataproxy.ProxySource;
 import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet;
-import org.apache.inlong.manager.dao.entity.CacheCluster;
-import org.apache.inlong.manager.dao.entity.CacheClusterExt;
-import org.apache.inlong.manager.dao.entity.CacheTopic;
-import org.apache.inlong.manager.dao.entity.ClusterSet;
-import org.apache.inlong.manager.dao.entity.FlumeChannel;
-import org.apache.inlong.manager.dao.entity.FlumeChannelExt;
-import org.apache.inlong.manager.dao.entity.FlumeSink;
-import org.apache.inlong.manager.dao.entity.FlumeSinkExt;
-import org.apache.inlong.manager.dao.entity.FlumeSource;
-import org.apache.inlong.manager.dao.entity.FlumeSourceExt;
-import org.apache.inlong.manager.dao.entity.InLongId;
-import org.apache.inlong.manager.dao.entity.ProxyCluster;
-import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
 import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,31 +41,35 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.PostConstruct;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
 
 /**
  * DataProxyConfigRepository
  */
-@SuppressWarnings("UnstableApiUsage")
 @Repository(value = "dataProxyConfigRepository")
 public class DataProxyConfigRepository implements IRepository {
 
     public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
     private static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
+    public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
+    public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
     private static final Gson gson = new Gson();
 
-    private final Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
-    private final Map<String, CacheClusterObject> cacheClusterMap = new HashMap<>();
-    private Map<String, DataProxyClusterSet> clusterSets = new HashMap<>();
+    // key: proxyClusterName, value: jsonString
+    private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
+    // key: proxyClusterName, value: md5
+    private Map<String, String> proxyMd5Map = new ConcurrentHashMap<>();
+
     private long reloadInterval;
 
     @Autowired
@@ -102,348 +94,231 @@ public class DataProxyConfigRepository implements IRepository {
     @Transactional(rollbackFor = Exception.class)
     public void reload() {
         LOGGER.info("start to reload config.");
-        List<ClusterSet> setList = clusterSetMapper.selectClusterSet();
-        if (setList.size() == 0) {
+        Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster();
+        if (proxyClusterMap.size() == 0) {
             return;
         }
-
-        Map<String, DataProxyClusterSet> newClusterSets = new HashMap<>();
-        for (ClusterSet set : setList) {
-            String setName = set.getSetName();
-            DataProxyClusterSet setObj = new DataProxyClusterSet();
-            setObj.setSetName(setName);
-            setObj.getCacheClusterSet().setSetName(setName);
-            setObj.getCacheClusterSet().setType(set.getMiddlewareType());
-            newClusterSets.put(setName, setObj);
+        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster();
+        Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId();
+        // mapping inlongIdMap
+        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
+            String clusterTag = entry.getValue().getSetName();
+            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+            if (inlongIds != null) {
+                entry.getValue().setInlongIds(inlongIds);
+            }
         }
-        this.proxyClusterMap.clear();
-        this.cacheClusterMap.clear();
-        this.reloadCacheCluster(newClusterSets);
-        this.reloadCacheClusterExt(newClusterSets);
-        this.reloadCacheTopic(newClusterSets);
-        this.reloadFlumeChannel(newClusterSets);
-        this.reloadFlumeChannelExt(newClusterSets);
-        this.reloadFlumeSource(newClusterSets);
-        this.reloadFlumeSourceExt(newClusterSets);
-        this.reloadFlumeSink(newClusterSets);
-        this.reloadFlumeSinkExt(newClusterSets);
-        // reload inlongid
-        this.reloadInlongId(newClusterSets);
-        this.reloadProxyCluster(newClusterSets);
-        this.reloadProxy2Cache(newClusterSets);
-        this.generateClusterJson(newClusterSets);
 
-        // replace
-        this.clusterSets = newClusterSets;
+        // generateClusterJson
+        this.generateClusterJson(proxyClusterMap, cacheClusterMap);
 
         LOGGER.info("end to reload config.");
     }
 
-    /**
-     * setReloadTimer
-     */
-    private void setReloadTimer() {
-        Timer reloadTimer = new Timer(true);
-        TimerTask task = new RepositoryTimerTask<DataProxyConfigRepository>(this);
-        reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
-    }
-
-    /**
-     * get clusterSets
-     *
-     * @return the clusterSets
-     */
-    public Map<String, DataProxyClusterSet> getClusterSets() {
-        return clusterSets;
-    }
-
-    /**
-     * reloadCacheCluster
-     *
-     * @param newClusterSets
-     */
-    private void reloadCacheCluster(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
-            CacheClusterObject obj = new CacheClusterObject();
-            obj.setName(cacheCluster.getClusterName());
-            obj.setZone(cacheCluster.getZone());
-            cacheClusterMap.put(obj.getName(), obj);
-            String setName = cacheCluster.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getCacheClusterSet().getCacheClusters().add(obj);
-        }
-    }
-
-    /**
-     * getOrCreateDataProxyClusterSet
-     *
-     * @param clusterSets
-     * @param setName
-     * @return
-     */
-    private DataProxyClusterSet getOrCreateDataProxyClusterSet(Map<String, DataProxyClusterSet> clusterSets,
-            String setName) {
-        DataProxyClusterSet setObj = clusterSets.get(setName);
-        if (setObj == null) {
-            setObj = new DataProxyClusterSet();
-            setObj.setSetName(setName);
-            clusterSets.put(setName, setObj);
-        }
-        return setObj;
-    }
-
-    /**
-     * reloadCacheClusterExt
-     *
-     * @param newClusterSets
-     */
-    private void reloadCacheClusterExt(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (CacheClusterExt ext : clusterSetMapper.selectCacheClusterExt()) {
-            String clusterName = ext.getClusterName();
-            CacheClusterObject cacheClusterObject = cacheClusterMap.get(clusterName);
-            if (cacheClusterObject != null) {
-                cacheClusterObject.getParams().put(ext.getKeyName(), ext.getKeyValue());
-            }
-        }
-    }
-
-    /**
-     * reloadCacheTopic
-     *
-     * @param newClusterSets
-     */
-    private void reloadCacheTopic(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (CacheTopic cacheTopic : clusterSetMapper.selectCacheTopic()) {
-            CacheTopicObject obj = new CacheTopicObject();
-            obj.setTopic(cacheTopic.getTopicName());
-            obj.setPartitionNum(cacheTopic.getPartitionNum());
-            String setName = cacheTopic.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getCacheClusterSet().getTopics().add(obj);
-        }
-    }
-
     /**
      * reloadProxyCluster
-     *
-     * @param newClusterSets
      */
-    private void reloadProxyCluster(Map<String, DataProxyClusterSet> newClusterSets) {
+    private Map<String, ProxyClusterObject> reloadProxyCluster() {
+        Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
         for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
-            String setName = proxyCluster.getSetName();
             ProxyClusterObject obj = new ProxyClusterObject();
             obj.setName(proxyCluster.getClusterName());
-            obj.setSetName(setName);
-            obj.setZone(proxyCluster.getZone());
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getProxyClusterList().add(obj);
-            this.proxyClusterMap.put(obj.getName(), obj);
-            // channels
-            obj.getChannels().addAll(setObj.getProxyChannelMap().values());
-            // inlongids
-            obj.getInlongIds().addAll(setObj.getInlongIds());
-            // sinks
-            obj.getSinks().addAll(setObj.getProxySinkMap().values());
-            // sources
-            obj.getSources().addAll(setObj.getProxySourceMap().values());
+            obj.setSetName(proxyCluster.getClusterTag());
+            obj.setZone(proxyCluster.getExtTag());
+            proxyClusterMap.put(obj.getName(), obj);
         }
+        return proxyClusterMap;
     }
 
     /**
-     * reloadFlumeChannel
-     *
-     * @param newClusterSets
-     */
-    private void reloadFlumeChannel(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeChannel flumeChannel : clusterSetMapper.selectFlumeChannel()) {
-            ProxyChannel obj = new ProxyChannel();
-            obj.setName(flumeChannel.getChannelName());
-            obj.setType(flumeChannel.getType());
-            String setName = flumeChannel.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getProxyChannelMap().put(obj.getName(), obj);
-        }
-    }
-
-    /**
-     * reloadFlumeChannelExt
-     *
-     * @param newClusterSets
+     * reloadCacheCluster
      */
-    private void reloadFlumeChannelExt(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeChannelExt ext : clusterSetMapper.selectFlumeChannelExt()) {
-            String setName = ext.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            ProxyChannel obj = setObj.getProxyChannelMap().get(ext.getParentName());
-            if (obj != null) {
-                obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+    private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
+        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
+        for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
+            Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
+            String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
+            if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
+                cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTag(), k -> new HashMap<>())
+                        .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
             }
         }
+        return cacheClusterMap;
     }
 
     /**
-     * reloadFlumeSource
-     *
-     * @param newClusterSets
-     */
-    private void reloadFlumeSource(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeSource flumeSource : clusterSetMapper.selectFlumeSource()) {
-            ProxySource obj = new ProxySource();
-            obj.setName(flumeSource.getSourceName());
-            obj.setSelectorType(flumeSource.getSelectorType());
-            obj.setType(flumeSource.getType());
-            String channels = flumeSource.getChannels();
-            obj.getChannels().addAll(Arrays.asList(channels.split("\\s+")));
-            String setName = flumeSource.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getProxySourceMap().put(obj.getName(), obj);
-        }
-    }
-
-    /**
-     * reloadFlumeSourceExt
-     *
-     * @param newClusterSets
+     * reloadInlongId
      */
-    private void reloadFlumeSourceExt(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeSourceExt ext : clusterSetMapper.selectFlumeSourceExt()) {
-            String setName = ext.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            ProxySource obj = setObj.getProxySourceMap().get(ext.getParentName());
-            if (obj != null) {
-                obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+    @SuppressWarnings("unchecked")
+    private Map<String, List<InLongIdObject>> reloadInlongId() {
+        // parse group
+        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+        // parse stream
+        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+        for (InlongStreamId streamIdObj : clusterSetMapper.selectInlongStreamId()) {
+            String groupId = streamIdObj.getInlongGroupId();
+            InlongGroupId groupIdObj = groupIdMap.get(groupId);
+            if (groupId == null) {
+                continue;
+            }
+            // choose topic
+            String groupTopic = groupIdObj.getTopic();
+            String streamTopic = streamIdObj.getTopic();
+            String finalTopic = null;
+            if (StringUtils.isEmpty(groupTopic)) {
+                // both empty then ignore
+                if (StringUtils.isEmpty(streamTopic)) {
+                    continue;
+                } else {
+                    finalTopic = streamTopic;
+                }
+            } else {
+                if (StringUtils.isEmpty(streamTopic)) {
+                    finalTopic = groupTopic;
+                } else {
+                    // Pulsar: namespace+topic
+                    finalTopic = groupTopic + "/" + streamTopic;
+                }
+            }
+            // concat id
+            InLongIdObject obj = new InLongIdObject();
+            String inlongId = groupId + "." + streamIdObj.getInlongStreamId();
+            obj.setInlongId(inlongId);
+            obj.setTopic(finalTopic);
+            Map<String, String> params = new HashMap<>();
+            obj.setParams(params);
+            // parse group extparams
+            if (!StringUtils.isEmpty(groupIdObj.getExtParams())) {
+                try {
+                    Map<String, String> groupParams = gson.fromJson(groupIdObj.getExtParams(), Map.class);
+                    params.putAll(groupParams);
+                } catch (Exception e) {
+                    LOGGER.error(e.getMessage(), e);
+                }
             }
+            // parse stream extparams
+            if (!StringUtils.isEmpty(streamIdObj.getExtParams())) {
+                try {
+                    Map<String, String> streamParams = gson.fromJson(streamIdObj.getExtParams(), Map.class);
+                    params.putAll(streamParams);
+                } catch (Exception e) {
+                    LOGGER.error(e.getMessage(), e);
+                }
+            }
+            inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
         }
+        return inlongIdMap;
     }
 
     /**
-     * reloadFlumeSink
-     *
-     * @param newClusterSets
+     * setReloadTimer
      */
-    private void reloadFlumeSink(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeSink flumeSink : clusterSetMapper.selectFlumeSink()) {
-            ProxySink obj = new ProxySink();
-            obj.setName(flumeSink.getSinkName());
-            obj.setType(flumeSink.getType());
-            obj.setChannel(flumeSink.getChannel());
-            String setName = flumeSink.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getProxySinkMap().put(obj.getName(), obj);
-        }
+    private void setReloadTimer() {
+        Timer reloadTimer = new Timer(true);
+        TimerTask task = new RepositoryTimerTask<DataProxyConfigRepository>(this);
+        reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
     }
 
     /**
-     * reloadFlumeSinkExt
+     * generateClusterJson
      *
      * @param newClusterSets
      */
-    private void reloadFlumeSinkExt(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (FlumeSinkExt ext : clusterSetMapper.selectFlumeSinkExt()) {
-            String setName = ext.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            ProxySink obj = setObj.getProxySinkMap().get(ext.getParentName());
-            if (obj != null) {
-                obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+    @SuppressWarnings("unchecked")
+    private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap,
+            Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) {
+        Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
+        Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
+        Map<String, Map<String, String>> tagCache = new HashMap<>();
+        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
+            ProxyClusterObject proxyObj = entry.getValue();
+            // proxy
+            DataProxyCluster clusterObj = new DataProxyCluster();
+            clusterObj.setProxyCluster(proxyObj);
+            // cache
+            String clusterTag = proxyObj.getSetName();
+            String extTag = proxyObj.getZone();
+            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+            if (cacheClusterZoneMap != null) {
+                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+                    if (cacheEntry.getValue().size() == 0) {
+                        continue;
+                    }
+                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+                    if (isSubTag(wholeTagMap, subTagMap)) {
+                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+                        cacheSet.setSetName(clusterTag);
+                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+                        cacheSet.setType(cacheClusterList.get(0).getType());
+                        List<CacheClusterObject> cacheClusters = new ArrayList<>(cacheClusterList.size());
+                        cacheSet.setCacheClusters(cacheClusters);
+                        for (CacheCluster cacheCluster : cacheClusterList) {
+                            CacheClusterObject obj = new CacheClusterObject();
+                            obj.setName(cacheCluster.getClusterName());
+                            obj.setZone(cacheCluster.getExtTag());
+                            try {
+                                Map<String, String> params = gson.fromJson(cacheCluster.getExtParams(), Map.class);
+                                obj.setParams(params);
+                            } catch (Exception e) {
+                                LOGGER.error(e.getMessage(), e);
+                            }
+                            cacheClusters.add(obj);
+                        }
+                    }
+                }
             }
+            // json
+            String jsonDataProxyCluster = gson.toJson(clusterObj);
+            String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
+            DataProxyConfigResponse response = new DataProxyConfigResponse();
+            response.setResult(true);
+            response.setErrCode(DataProxyConfigResponse.SUCC);
+            response.setMd5(md5);
+            response.setData(clusterObj);
+            String jsonResponse = gson.toJson(response);
+            newProxyConfigJson.put(proxyObj.getName(), jsonResponse);
+            newProxyMd5Map.put(proxyObj.getName(), md5);
         }
-    }
 
-    /**
-     * reloadInlongId
-     *
-     * @param newClusterSets
-     */
-    private void reloadInlongId(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (InLongId inlongId : clusterSetMapper.selectInlongId()) {
-            InLongIdObject obj = new InLongIdObject();
-            obj.setInlongId(inlongId.getInlongId());
-            obj.setTopic(inlongId.getTopic());
-            if (inlongId.getParams() != null) {
-                Map<String, String> params = MAP_SPLITTER.split(inlongId.getParams());
-                obj.getParams().putAll(params);
-            }
-            String setName = inlongId.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.getInlongIds().add(obj);
-        }
+        // replace
+        this.proxyConfigJson = newProxyConfigJson;
+        this.proxyMd5Map = newProxyMd5Map;
     }
 
     /**
-     * reloadInlongId
-     *
-     * @param newClusterSets
+     * isSubTag
+     * @param wholeTagMap
+     * @param subTagMap
+     * @return
      */
-    private void reloadProxy2Cache(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (ProxyClusterToCacheCluster proxy2Cache : clusterSetMapper.selectProxyClusterToCacheCluster()) {
-            String proxyClusterName = proxy2Cache.getProxyClusterName();
-            String cacheClusterName = proxy2Cache.getCacheClusterName();
-            ProxyClusterObject proxyObj = this.proxyClusterMap.get(proxyClusterName);
-            if (proxyObj == null) {
-                continue;
-            }
-            CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName);
-            if (cacheObj == null) {
-                continue;
+    private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
+        for (Entry<String, String> entry : subTagMap.entrySet()) {
+            String value = wholeTagMap.get(entry.getKey());
+            if (value == null || !value.equals(entry.getValue())) {
+                return false;
             }
-            String setName = proxyObj.getSetName();
-            DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
-            setObj.addProxy2Cache(proxyClusterName, cacheClusterName);
         }
+        return true;
     }
 
     /**
-     * generateClusterJson
-     *
-     * @param newClusterSets
+     * getProxyMd5
+     * @param clusterName
+     * @return
      */
-    private void generateClusterJson(Map<String, DataProxyClusterSet> newClusterSets) {
-        for (Entry<String, DataProxyClusterSet> entry : newClusterSets.entrySet()) {
-            for (ProxyClusterObject proxyObj : entry.getValue().getProxyClusterList()) {
-                // proxy
-                DataProxyCluster clusterObj = new DataProxyCluster();
-                clusterObj.setProxyCluster(proxyObj);
-                // cache
-                CacheClusterSetObject allCacheCluster = entry.getValue().getCacheClusterSet();
-                CacheClusterSetObject proxyCacheClusterSet = clusterObj.getCacheClusterSet();
-                proxyCacheClusterSet.setSetName(allCacheCluster.getSetName());
-                proxyCacheClusterSet.setType(allCacheCluster.getType());
-                proxyCacheClusterSet.setTopics(allCacheCluster.getTopics());
-                // cacheCluster
-                Set<String> cacheClusterNameSet = entry.getValue().getProxy2Cache().get(proxyObj.getName());
-                if (cacheClusterNameSet != null) {
-                    for (String cacheClusterName : cacheClusterNameSet) {
-                        CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName);
-                        if (cacheObj == null) {
-                            continue;
-                        }
-                        proxyCacheClusterSet.getCacheClusters().add(cacheObj);
-                    }
-                }
-                //
-                String jsonDataProxyCluster = gson.toJson(clusterObj);
-                String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
-                DataProxyConfigResponse response = new DataProxyConfigResponse();
-                response.setResult(true);
-                response.setErrCode(DataProxyConfigResponse.SUCC);
-                response.setMd5(md5);
-                response.setData(clusterObj);
-                String jsonResponse = gson.toJson(response);
-                entry.getValue().getProxyConfigJson().put(proxyObj.getName(), jsonResponse);
-                entry.getValue().getMd5Map().put(proxyObj.getName(), md5);
-                entry.getValue().setDefaultConfigJson(jsonResponse);
-            }
-        }
+    public String getProxyMd5(String clusterName) {
+        return this.proxyMd5Map.get(clusterName);
     }
 
     /**
-     * getDataProxyClusterSet
-     *
-     * @param setName
+     * getProxyConfigJson
+     * @param clusterName
      * @return
      */
-    public DataProxyClusterSet getDataProxyClusterSet(String setName) {
-        return this.clusterSets.get(setName);
+    public String getProxyConfigJson(String clusterName) {
+        return this.proxyConfigJson.get(clusterName);
     }
-
 }
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 52adb2658..72d352272 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -838,209 +838,6 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow task table';
 
--- ----------------------------
--- Table structure for cluster_set
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cluster_set`
-(
-    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `set_name`        varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `cn_name`         varchar(256) COMMENT 'Chinese display name',
-    `description`     varchar(256) COMMENT 'ClusterSet Introduction',
-    `middleware_type` varchar(10)       DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
-    `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
-    `followers`       varchar(512) COMMENT 'Name of followers, separated by commas',
-    `status`          int(4)            DEFAULT '21' COMMENT 'ClusterSet status',
-    `is_deleted`      int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
-    `create_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_set` (`set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='ClusterSet table';
-
--- ----------------------------
--- Table structure for cluster_set_inlongid
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cluster_set_inlongid`
-(
-    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `set_name`        varchar(256) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, filled in by the user, undeleted ones cannot be repeated',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='InlongId table';
-
--- ----------------------------
--- Table structure for cache_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_cluster`
-(
-    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
-    `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster table';
-
--- ----------------------------
--- Table structure for cache_cluster_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_cluster_ext`
-(
-    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
-    `key_name`     varchar(256) NOT NULL COMMENT 'Configuration item name',
-    `key_value`    text         NULL COMMENT 'The value of the configuration item',
-    `is_deleted`   int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_cache_cluster` (`cluster_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster extension table';
-
--- ----------------------------
--- Table structure for cache_topic
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_topic`
-(
-    `id`            int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `topic_name`    varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
-    `set_name`      varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `partition_num` int(11)      NOT NULL COMMENT 'Partition number',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='CacheTopic table';
-
--- ----------------------------
--- Table structure for proxy_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `proxy_cluster`
-(
-    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
-    `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='ProxyCluster table';
-
--- ----------------------------
--- Table structure for proxy_cluster_to_cache_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `proxy_cluster_to_cache_cluster`
-(
-    `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
-    `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='The relation table of ProxyCluster and CacheCluster';
-
--- ----------------------------
--- Table structure for flume_source
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_source`
-(
-    `id`            int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `source_name`   varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore',
-    `set_name`      varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `type`          varchar(128) NOT NULL COMMENT 'FlumeSource classname',
-    `channels`      varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
-    `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource table';
-
--- ----------------------------
--- Table structure for flume_source_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_source_ext`
-(
-    `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore',
-    `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `key_name`    varchar(256) NOT NULL COMMENT 'Configuration item name',
-    `key_value`   text         NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_flume_source_ext` (`parent_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource extension table';
-
--- ----------------------------
--- Table structure for flume_channel
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_channel`
-(
-    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
-    `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `type`         varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel table';
-
--- ----------------------------
--- Table structure for flume_channel_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_channel_ext`
-(
-    `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `parent_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
-    `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `key_name`    varchar(256) NOT NULL COMMENT 'Configuration item name',
-    `key_value`   text         NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_flume_channel_ext` (`parent_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel extension table';
-
--- ----------------------------
--- Table structure for flume_sink
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_sink`
-(
-    `id`        int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `sink_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore',
-    `set_name`  varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `type`      varchar(128) NOT NULL COMMENT 'FlumeSink classname',
-    `channel`   varchar(128) NOT NULL COMMENT 'FlumeSink channel',
-    PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink table';
-
--- ----------------------------
--- Table structure for flume_sink_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_sink_ext`
-(
-    `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore',
-    `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
-    `key_name`    varchar(256) NOT NULL COMMENT 'Configuration item name',
-    `key_value`   text         NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_flume_sink_ext` (`parent_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink extension table';
-
 -- ----------------------------
 -- Table structure for db_collector_detail_task
 -- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 5ee5986ff..ef0a93021 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
 import org.apache.inlong.manager.common.beans.Response;
@@ -31,8 +29,12 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+
 import java.util.List;
 
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+
 /**
  * Data proxy controller.
  */
@@ -70,9 +72,9 @@ public class DataProxyController {
 
     @GetMapping("/getAllConfig")
     @ApiOperation(value = "Get all proxy config")
-    public String getAllConfig(@RequestParam("clusterName") String clusterName, @RequestParam("setName") String setName,
+    public String getAllConfig(@RequestParam("clusterName") String clusterName,
             @RequestParam(value = "md5", required = false) String md5) {
-        return dataProxyClusterService.getAllConfig(clusterName, setName, md5);
+        return dataProxyClusterService.getAllConfig(clusterName, md5);
     }
 
 }
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties
index ed6f02848..731a80d70 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -25,6 +25,7 @@ server.servlet.context-path=/api/inlong/manager
 spring.application.name=InLong-Manager-Web
 spring.profiles.active=dev
 
+spring.main.allow-circular-references=true
 spring.mvc.pathmatch.matching-strategy=ANT_PATH_MATCHER
 
 # Serialize the Date type to a timestamp
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
index a60f45fd4..d70501063 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
@@ -17,25 +17,25 @@
 
 package org.apache.inlong.sort.standalone.dispatch;
 
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flume.Context;
-import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.slf4j.Logger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * DispatchManager
  */
 public class DispatchManager {
 
-    public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+    public static final Logger LOG = InlongLoggerFactory.getLogger(DispatchManager.class);
     public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
     public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
     public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
@@ -44,13 +44,15 @@ public class DispatchManager {
     public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
     public static final long MINUTE_MS = 60L * 1000;
 
-    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
     private final long dispatchTimeout;
     private final long maxPackCount;
     private final long maxPackSize;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
     private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
-    //
+    // flag that manager need to output overtime data.
     private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
+    private AtomicLong inCounter = new AtomicLong(0);
+    private AtomicLong outCounter = new AtomicLong(0);
 
     /**
      * Constructor
@@ -71,10 +73,6 @@ public class DispatchManager {
      * @param event
      */
     public void addEvent(ProfileEvent event) {
-        if (needOutputOvertimeData.get()) {
-            this.outputOvertimeData();
-            this.needOutputOvertimeData.set(false);
-        }
         // parse
         String eventUid = event.getUid();
         long dispatchTime = event.getRawLogTime() - event.getRawLogTime() % MINUTE_MS;
@@ -93,8 +91,10 @@ public class DispatchManager {
                     event.getInlongStreamId(), dispatchTime);
             DispatchProfile oldDispatchProfile = this.profileCache.put(dispatchKey, newDispatchProfile);
             this.dispatchQueue.offer(oldDispatchProfile);
+            outCounter.addAndGet(dispatchProfile.getCount());
             newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
         }
+        inCounter.incrementAndGet();
     }
 
     /**
@@ -103,6 +103,9 @@ public class DispatchManager {
      * @return
      */
     public void outputOvertimeData() {
+        if (!needOutputOvertimeData.getAndSet(false)) {
+            return;
+        }
         LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
                 profileCache.size(), dispatchQueue.size());
         long currentTime = System.currentTimeMillis();
@@ -119,10 +122,16 @@ public class DispatchManager {
         }
         // output
         removeKeys.forEach((key) -> {
-            dispatchQueue.offer(this.profileCache.remove(key));
+            DispatchProfile dispatchProfile = this.profileCache.remove(key);
+            if (dispatchProfile != null) {
+                dispatchQueue.offer(dispatchProfile);
+                outCounter.addAndGet(dispatchProfile.getCount());
+            }
         });
-        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
-                profileCache.size(), dispatchQueue.size(), eventCount);
+        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+                + "inCounter:{},outCounter:{}",
+                profileCache.size(), dispatchQueue.size(), eventCount,
+                inCounter.getAndSet(0), outCounter.getAndSet(0));
     }
 
     /**
@@ -156,6 +165,6 @@ public class DispatchManager {
      * setNeedOutputOvertimeData
      */
     public void setNeedOutputOvertimeData() {
-        this.needOutputOvertimeData.set(true);
+        this.needOutputOvertimeData.getAndSet(true);
     }
 }