You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/08 23:51:35 UTC

[incubator-eventmesh] branch master updated: [ISSUE #285] add SubscriptionManager (#2856)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new d6ae87ad3 [ISSUE #285] add SubscriptionManager (#2856)
d6ae87ad3 is described below

commit d6ae87ad3284c2cb17012cec2745e6653fe6cf22
Author: wqliang <wq...@users.noreply.github.com>
AuthorDate: Mon Jan 9 07:51:29 2023 +0800

    [ISSUE #285] add SubscriptionManager (#2856)
    
    * add SubscriptionManager
    
    * register client in SubscriptionManager
    
    * update subscription and urls by SubscriptionManager
    
    * code reactor: new Client obj only when need
    
    * format code style
    
    * format code style
---
 .../runtime/admin/handler/HTTPClientHandler.java   |   4 +-
 .../runtime/boot/EventMeshHTTPServer.java          |  16 +--
 .../runtime/core/consumer/ClientInfo.java          |  70 +++++++++++
 .../runtime/core/consumer/SubscriptionManager.java | 132 +++++++++++++++++++
 .../core/consumergroup/ConsumerGroupConf.java      |   8 +-
 .../http/consumer/HttpClientGroupMapping.java      |   4 +-
 .../http/processor/HeartBeatProcessor.java         |   8 +-
 .../processor/LocalSubscribeEventProcessor.java    | 139 +++------------------
 .../processor/LocalUnSubscribeEventProcessor.java  |  20 +--
 .../http/processor/SubscribeProcessor.java         | 138 +++-----------------
 .../http/processor/UnSubscribeProcessor.java       |  22 ++--
 .../http/processor/inf/AbstractEventProcessor.java |   2 +-
 12 files changed, 275 insertions(+), 288 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
index e80107a7f..3ca9d5c55 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
@@ -77,7 +77,7 @@ public class HTTPClientHandler implements HttpHandler {
             DeleteHTTPClientRequest deleteHTTPClientRequest = JsonUtils.toObject(request, DeleteHTTPClientRequest.class);
             String url = deleteHTTPClientRequest.url;
 
-            for (List<Client> clientList : eventMeshHTTPServer.localClientInfoMapping.values()) {
+            for (List<Client> clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) {
                 clientList.removeIf(client -> Objects.equals(client.getUrl(), url));
             }
 
@@ -118,7 +118,7 @@ public class HTTPClientHandler implements HttpHandler {
             // Get the list of HTTP clients
             List<GetClientResponse> getClientResponseList = new ArrayList<>();
 
-            for (List<Client> clientList : eventMeshHTTPServer.localClientInfoMapping.values()) {
+            for (List<Client> clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) {
                 for (Client client : clientList) {
                     GetClientResponse getClientResponse = new GetClientResponse(
                         Optional.ofNullable(client.getEnv()).orElse(""),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 3c1b21688..ec375a182 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -29,7 +29,7 @@ import org.apache.eventmesh.metrics.api.MetricsRegistry;
 import org.apache.eventmesh.runtime.common.ServiceState;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
 import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
@@ -86,6 +86,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     private transient ConsumerManager consumerManager;
 
+    private transient SubscriptionManager subscriptionManager;
+
     private transient ProducerManager producerManager;
 
     private transient HttpRetryer httpRetryer;
@@ -112,12 +114,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     public transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
 
-    public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
-            new ConcurrentHashMap<>();
-
-    public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
-            new ConcurrentHashMap<>();
-
     public EventMeshHTTPServer(final EventMeshServer eventMeshServer,
                                final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
         super(eventMeshHttpConfiguration.httpServerPort, eventMeshHttpConfiguration.eventMeshServerUseTls, eventMeshHttpConfiguration);
@@ -246,6 +242,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
         this.setMetrics(new HTTPMetricsServer(this, metricsRegistries));
 
+        subscriptionManager = new SubscriptionManager();
+
         consumerManager = new ConsumerManager(this);
         consumerManager.init();
 
@@ -409,6 +407,10 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
         this.getHandlerService().register(webHookProcessor, webhookExecutor);
     }
 
+    public SubscriptionManager getSubscriptionManager() {
+        return subscriptionManager;
+    }
+
     public ConsumerManager getConsumerManager() {
         return consumerManager;
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java
new file mode 100644
index 000000000..60be5a1f1
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.consumer;
+
+public class ClientInfo {
+    private String env;
+
+    private String idc;
+
+    private String sys;
+
+    private String pid;
+
+    private String ip;
+
+    public String getEnv() {
+        return env;
+    }
+
+    public void setEnv(String env) {
+        this.env = env;
+    }
+
+    public String getIdc() {
+        return idc;
+    }
+
+    public void setIdc(String idc) {
+        this.idc = idc;
+    }
+
+    public String getSys() {
+        return sys;
+    }
+
+    public void setSys(String sys) {
+        this.sys = sys;
+    }
+
+    public String getPid() {
+        return pid;
+    }
+
+    public void setPid(String pid) {
+        this.pid = pid;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
new file mode 100644
index 000000000..04f7b8c57
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.consumer;
+
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionManager {
+    private static final Logger logger = LoggerFactory.getLogger(SubscriptionManager.class);
+    private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
+            new ConcurrentHashMap<>();
+
+    private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
+            new ConcurrentHashMap<>();
+
+    public ConcurrentHashMap<String, ConsumerGroupConf> getLocalConsumerGroupMapping() {
+        return localConsumerGroupMapping;
+    }
+
+    public ConcurrentHashMap<String, List<Client>> getLocalClientInfoMapping() {
+        return localClientInfoMapping;
+    }
+
+    public void registerClient(final ClientInfo clientInfo, final String consumerGroup,
+                               final List<SubscriptionItem> subscriptionItems, final String url) {
+        for (final SubscriptionItem subscription : subscriptionItems) {
+            final String groupTopicKey = consumerGroup + "@" + subscription.getTopic();
+
+            List<Client> localClients = localClientInfoMapping.get(groupTopicKey);
+
+            if (localClients == null) {
+                localClientInfoMapping.putIfAbsent(groupTopicKey, new ArrayList<>());
+                localClients = localClientInfoMapping.get(groupTopicKey);
+            }
+
+            boolean isContains = false;
+            for (final Client localClient : localClients) {
+                //TODO: compare the whole Client would be better?
+                if (StringUtils.equals(localClient.getUrl(), url)) {
+                    isContains = true;
+                    localClient.setLastUpTime(new Date());
+                    break;
+                }
+            }
+
+            if (!isContains) {
+                Client client = new Client();
+                client.setEnv(clientInfo.getEnv());
+                client.setIdc(clientInfo.getIdc());
+                client.setSys(clientInfo.getSys());
+                client.setIp(clientInfo.getIp());
+                client.setPid(clientInfo.getPid());
+                client.setConsumerGroup(consumerGroup);
+                client.setTopic(subscription.getTopic());
+                client.setUrl(url);
+                client.setLastUpTime(new Date());
+                localClients.add(client);
+            }
+        }
+    }
+
+    public void updateSubscription(ClientInfo clientInfo, String consumerGroup,
+                                   String url, List<SubscriptionItem> subscriptionList) {
+        for (final SubscriptionItem subscription : subscriptionList) {
+            final List<Client> groupTopicClients = localClientInfoMapping
+                    .get(consumerGroup + "@" + subscription.getTopic());
+
+            if (CollectionUtils.isEmpty(groupTopicClients)) {
+                logger.error("group {} topic {} clients is empty", consumerGroup, subscription);
+            }
+
+            ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
+            if (consumerGroupConf == null) {
+                // new subscription
+                ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup));
+                if (prev == null) {
+                    logger.info("add new subscription, consumer group: {}", consumerGroup);
+                }
+                consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
+            }
+
+            ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf()
+                    .get(subscription.getTopic());
+            if (consumerGroupTopicConf == null) {
+                consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> {
+                    ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
+                    newTopicConf.setConsumerGroup(consumerGroup);
+                    newTopicConf.setTopic(topic);
+                    newTopicConf.setSubscriptionItem(subscription);
+                    logger.info("add new {}", newTopicConf);
+                    return newTopicConf;
+                });
+                consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic());
+            }
+
+            consumerGroupTopicConf.getUrls().add(url);
+            if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {
+                consumerGroupTopicConf.getIdcUrls().putIfAbsent(clientInfo.getIdc(), new ArrayList<>());
+            }
+            //TODO: idcUrl list is not thread-safe
+            consumerGroupTopicConf.getIdcUrls().get(clientInfo.getIdc()).add(url);
+        }
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
index 2c44b5958..289256c76 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.core.consumergroup;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.Maps;
 
@@ -27,7 +28,8 @@ public class ConsumerGroupConf implements Serializable {
     //eg . 5013-1A0
     private String consumerGroup;
 
-    private Map<String, ConsumerGroupTopicConf> consumerGroupTopicConf = Maps.newConcurrentMap();
+    private final ConcurrentHashMap<String/*topic*/, ConsumerGroupTopicConf> consumerGroupTopicConf
+            = new ConcurrentHashMap<String, ConsumerGroupTopicConf>();
 
     public ConsumerGroupConf(String consumerGroup) {
         this.consumerGroup = consumerGroup;
@@ -45,10 +47,6 @@ public class ConsumerGroupConf implements Serializable {
         return consumerGroupTopicConf;
     }
 
-    public void setConsumerGroupTopicConf(Map<String, ConsumerGroupTopicConf> consumerGroupTopicConf) {
-        this.consumerGroupTopicConf = consumerGroupTopicConf;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 43103672d..a0b2186be 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -229,9 +229,7 @@ public final class HttpClientGroupMapping {
             urls.add(url);
             idcUrls.put(clientIdc, urls);
             consumeTopicConfig.setIdcUrls(idcUrls);
-            final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
-            map.put(subTopic.getTopic(), consumeTopicConfig);
-            consumerGroupConf.setConsumerGroupTopicConf(map);
+            consumerGroupConf.getConsumerGroupTopicConf().put(subTopic.getTopic(), consumeTopicConfig);
             localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
             isChange = true;
         } else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index a606f8e04..3bd84f055 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -162,17 +162,17 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
 
         }
 
-        synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
             for (final Map.Entry<String, List<Client>> groupTopicClientMapping : tmpMap.entrySet()) {
                 final List<Client> localClientList =
-                        eventMeshHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey());
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicClientMapping.getKey());
                 if (CollectionUtils.isEmpty(localClientList)) {
-                    eventMeshHTTPServer.localClientInfoMapping
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
                             .put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
                 } else {
                     final List<Client> tmpClientList = groupTopicClientMapping.getValue();
                     supplyClientInfoList(tmpClientList, localClientList);
-                    eventMeshHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), localClientList);
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicClientMapping.getKey(), localClientList);
                 }
 
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index afc2cad28..3a2e57845 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -29,21 +29,14 @@ import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.common.EventMeshTrace;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.WebhookUtil;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -173,87 +166,17 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
             return;
         }
 
-        synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-
-            registerClient(requestWrapper, consumerGroup, subscriptionList, url);
-
-            for (final SubscriptionItem subTopic : subscriptionList) {
-                final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
-                        .get(consumerGroup + "@" + subTopic.getTopic());
-
-                if (CollectionUtils.isEmpty(groupTopicClients)) {
-                    if (log.isErrorEnabled()) {
-                        log.error("group {} topic {} clients is empty", consumerGroup, subTopic);
-                    }
-                }
-
-                final Map<String, List<String>> idcUrls = new HashMap<>();
-                for (final Client client : groupTopicClients) {
-                    if (idcUrls.containsKey(client.getIdc())) {
-                        idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
-                    } else {
-                        final List<String> urls = new ArrayList<>();
-                        urls.add(client.getUrl());
-                        idcUrls.put(client.getIdc(), urls);
-                    }
-                }
-
-                ConsumerGroupConf consumerGroupConf =
-                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
-                if (consumerGroupConf == null) {
-                    // new subscription
-                    consumerGroupConf = new ConsumerGroupConf(consumerGroup);
-                    final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
-                    consumeTopicConfig.setConsumerGroup(consumerGroup);
-                    consumeTopicConfig.setTopic(subTopic.getTopic());
-                    consumeTopicConfig.setSubscriptionItem(subTopic);
-                    consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
-                    consumeTopicConfig.setIdcUrls(idcUrls);
-
-                    final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
-                    map.put(subTopic.getTopic(), consumeTopicConfig);
-                    consumerGroupConf.setConsumerGroupTopicConf(map);
-                } else {
-                    // already subscribed
-                    final Map<String, ConsumerGroupTopicConf> map =
-                            consumerGroupConf.getConsumerGroupTopicConf();
-                    if (!map.containsKey(subTopic.getTopic())) {
-                        //If there are multiple topics, append it
-                        final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
-                        newTopicConf.setConsumerGroup(consumerGroup);
-                        newTopicConf.setTopic(subTopic.getTopic());
-                        newTopicConf.setSubscriptionItem(subTopic);
-                        newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-                        newTopicConf.setIdcUrls(idcUrls);
-                        map.put(subTopic.getTopic(), newTopicConf);
-                    }
-
-                    for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
-                        if (!StringUtils.equals(subTopic.getTopic(), set.getKey())) {
-                            continue;
-                        }
-
-                        final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
-                        latestTopicConf.setConsumerGroup(consumerGroup);
-                        latestTopicConf.setTopic(subTopic.getTopic());
-                        latestTopicConf.setSubscriptionItem(subTopic);
-                        latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-
-                        final ConsumerGroupTopicConf currentTopicConf = set.getValue();
-                        latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
-                        latestTopicConf.setIdcUrls(idcUrls);
-
-                        map.put(set.getKey(), latestTopicConf);
-                    }
-                }
-                eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
-            }
+        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
+            ClientInfo clientInfo = getClientInfo(requestWrapper);
+            SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
+            subscriptionManager.registerClient(clientInfo, consumerGroup, subscriptionList, url);
+            subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subscriptionList);
 
             final long startTime = System.currentTimeMillis();
             try {
                 // subscription relationship change notification
                 eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
-                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
                 responseBodyMap.put("retCode", EventMeshRetCode.SUCCESS.getRetCode());
                 responseBodyMap.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg());
 
@@ -282,44 +205,14 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
         return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
     }
 
-    private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup,
-                                final List<SubscriptionItem> subscriptionItems, final String url) {
+    private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) {
         final Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
-        for (final SubscriptionItem item : subscriptionItems) {
-            final Client client = new Client();
-            client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
-            client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
-            client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
-            client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
-            client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
-            client.setConsumerGroup(consumerGroup);
-            client.setTopic(item.getTopic());
-            client.setUrl(url);
-            client.setLastUpTime(new Date());
-
-            final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
-
-            List<Client> localClients =
-                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
-            if (localClients == null) {
-                localClients = new ArrayList<>();
-                eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
-            }
-
-            boolean isContains = false;
-            for (final Client localClient : localClients) {
-                if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
-                    isContains = true;
-                    localClient.setLastUpTime(client.getLastUpTime());
-                    break;
-                }
-            }
-
-            if (!isContains) {
-                localClients.add(client);
-            }
-
-        }
+        ClientInfo clientInfo = new ClientInfo();
+        clientInfo.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+        clientInfo.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+        clientInfo.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+        clientInfo.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+        clientInfo.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+        return clientInfo;
     }
-
 }
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index ae703436f..1810b842b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -127,13 +127,13 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
 
         final String pid = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
 
-        synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
             boolean isChange = true;
 
             registerClient(requestWrapper, consumerGroup, unSubTopicList, unSubscribeUrl);
 
             for (final String unSubTopic : unSubTopicList) {
-                final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+                final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
                         .get(consumerGroup + "@" + unSubTopic);
                 final Iterator<Client> clientIterator = groupTopicClients.iterator();
                 while (clientIterator.hasNext()) {
@@ -167,9 +167,9 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
 
                     }
 
-                    synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
+                    synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
                         final ConsumerGroupConf consumerGroupConf =
-                                eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+                                eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
                         final Map<String, ConsumerGroupTopicConf> map =
                                 consumerGroupConf.getConsumerGroupTopicConf();
                         for (final Map.Entry<String, ConsumerGroupTopicConf> entry : map.entrySet()) {
@@ -184,7 +184,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
                                 map.put(unSubTopic, latestTopicConf);
                             }
                         }
-                        eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().put(consumerGroup, consumerGroupConf);
                     }
                 } else {
                     isChange = false;
@@ -195,7 +195,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
             if (isChange) {
                 try {
                     eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
-                            eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+                            eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
 
                     responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                     responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());
@@ -221,10 +221,10 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
 
                     handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
                     // clean ClientInfo
-                    eventMeshHTTPServer.localClientInfoMapping.keySet()
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
                             .removeIf(s -> StringUtils.contains(s, consumerGroup));
                     // clean ConsumerGroupInfo
-                    eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
                             .removeIf(s -> StringUtils.equals(consumerGroup, s));
                 } catch (Exception e) {
                     if (log.isErrorEnabled()) {
@@ -272,11 +272,11 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
             final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
 
             List<Client> localClients =
-                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey);
 
             if (localClients == null) {
                 localClients = new ArrayList<>();
-                eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
+                eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, localClients);
             }
 
             boolean isContains = false;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index f1935815a..3f6c24387 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -31,11 +31,10 @@ import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
@@ -44,13 +43,7 @@ import org.apache.eventmesh.runtime.util.WebhookUtil;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -189,85 +182,17 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             return;
         }
 
-        synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-
-            registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
-
-            for (final SubscriptionItem subTopic : subTopicList) {
-                final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
-                        .get(consumerGroup + "@" + subTopic.getTopic());
-
-                if (CollectionUtils.isEmpty(groupTopicClients)) {
-                    LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
-                }
-
-                final Map<String, List<String>> idcUrls = new HashMap<>();
-                for (final Client client : groupTopicClients) {
-                    List<String> urls = idcUrls.get(client.getIdc());
-                    if (urls == null) {
-                        urls = new ArrayList<>();
-                        idcUrls.put(client.getIdc(), urls);
-                    }
-                    urls.add(StringUtils.deleteWhitespace(client.getUrl()));
-                }
-
-                ConsumerGroupConf consumerGroupConf =
-                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
-
-                if (consumerGroupConf == null) {
-                    // new subscription
-                    consumerGroupConf = new ConsumerGroupConf(consumerGroup);
-                    final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
-                    consumeTopicConfig.setConsumerGroup(consumerGroup);
-                    consumeTopicConfig.setTopic(subTopic.getTopic());
-                    consumeTopicConfig.setSubscriptionItem(subTopic);
-                    consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
-
-                    consumeTopicConfig.setIdcUrls(idcUrls);
-
-                    final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
-                    map.put(subTopic.getTopic(), consumeTopicConfig);
-                    consumerGroupConf.setConsumerGroupTopicConf(map);
-                } else {
-                    // already subscribed
-                    final Map<String, ConsumerGroupTopicConf> map =
-                            consumerGroupConf.getConsumerGroupTopicConf();
-
-                    if (!map.containsKey(subTopic.getTopic())) {
-                        //If there are multiple topics, append it
-                        final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
-                        newTopicConf.setConsumerGroup(consumerGroup);
-                        newTopicConf.setTopic(subTopic.getTopic());
-                        newTopicConf.setSubscriptionItem(subTopic);
-                        newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-                        newTopicConf.setIdcUrls(idcUrls);
-                        map.put(subTopic.getTopic(), newTopicConf);
-                    }
-
-                    for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
-                        if (StringUtils.equals(subTopic.getTopic(), set.getKey())) {
-                            final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
-                            latestTopicConf.setConsumerGroup(consumerGroup);
-                            latestTopicConf.setTopic(subTopic.getTopic());
-                            latestTopicConf.setSubscriptionItem(subTopic);
-                            latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-
-                            final ConsumerGroupTopicConf currentTopicConf = set.getValue();
-                            latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
-                            latestTopicConf.setIdcUrls(idcUrls);
-
-                            map.put(set.getKey(), latestTopicConf);
-                        }
-                    }
-                }
-                eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
-            }
+        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
+            ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
+            SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
+            subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
+            subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);
 
             final long startTime = System.currentTimeMillis();
             try {
                 // subscription relationship change notification
                 eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
-                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
 
                 final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
                     @Override
@@ -316,44 +241,13 @@ public class SubscribeProcessor implements HttpRequestProcessor {
         return false;
     }
 
-    private void registerClient(final SubscribeRequestHeader subscribeRequestHeader, final String consumerGroup,
-                                final List<SubscriptionItem> subscriptionItems, final String url) {
-        for (final SubscriptionItem item : subscriptionItems) {
-            final Client client = new Client();
-            client.setEnv(subscribeRequestHeader.getEnv());
-            client.setIdc(subscribeRequestHeader.getIdc());
-            client.setSys(subscribeRequestHeader.getSys());
-            client.setIp(subscribeRequestHeader.getIp());
-            client.setPid(subscribeRequestHeader.getPid());
-            client.setConsumerGroup(consumerGroup);
-            client.setTopic(item.getTopic());
-            client.setUrl(url);
-            client.setLastUpTime(new Date());
-
-            final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
-
-            List<Client> localClients =
-                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
-
-            if (localClients == null) {
-                localClients = new ArrayList<>();
-                eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
-            }
-
-            boolean isContains = false;
-            for (final Client localClient : localClients) {
-                if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
-                    isContains = true;
-                    localClient.setLastUpTime(client.getLastUpTime());
-                    break;
-                }
-            }
-
-            if (!isContains) {
-                localClients.add(client);
-            }
-
-        }
+    private ClientInfo getClientInfo(final SubscribeRequestHeader subscribeRequestHeader) {
+        ClientInfo clientInfo = new ClientInfo();
+        clientInfo.setEnv(subscribeRequestHeader.getEnv());
+        clientInfo.setIdc(subscribeRequestHeader.getIdc());
+        clientInfo.setSys(subscribeRequestHeader.getSys());
+        clientInfo.setIp(subscribeRequestHeader.getIp());
+        clientInfo.setPid(subscribeRequestHeader.getPid());
+        return clientInfo;
     }
-
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index 3f6e6c18d..0e6049d62 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -136,13 +136,13 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             }
         };
 
-        synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
             boolean isChange = true;
 
             registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);
 
             for (final String unSubTopic : unSubTopicList) {
-                final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+                final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
                         .get(consumerGroup + "@" + unSubTopic);
 
                 final Iterator<Client> clientIterator = groupTopicClients.iterator();
@@ -173,9 +173,9 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                         }
 
                     }
-                    synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
+                    synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
                         final ConsumerGroupConf consumerGroupConf =
-                                eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+                                eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
 
                         final Map<String, ConsumerGroupTopicConf> map =
                                 consumerGroupConf.getConsumerGroupTopicConf();
@@ -195,7 +195,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                                 map.put(unSubTopic, latestTopicConf);
                             }
                         }
-                        eventMeshHTTPServer.localConsumerGroupMapping
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()
                                 .put(consumerGroup, consumerGroupConf);
                     }
                 } else {
@@ -208,7 +208,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             if (isChange) {
                 try {
                     eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
-                            eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+                            eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
 
                     responseEventMeshCommand =
                             asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
@@ -239,10 +239,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                             asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
                     asyncContext.onComplete(responseEventMeshCommand, handler);
                     // clean ClientInfo
-                    eventMeshHTTPServer.localClientInfoMapping.keySet()
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
                             .removeIf(s -> StringUtils.contains(s, consumerGroup));
                     // clean ConsumerGroupInfo
-                    eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
                             .removeIf(s -> StringUtils.equals(consumerGroup, s));
                 } catch (Exception e) {
                     final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
@@ -286,9 +286,9 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             client.setLastUpTime(new Date());
 
             final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
-            if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
+            if (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().containsKey(groupTopicKey)) {
                 final List<Client> localClients =
-                        eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+                        eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey);
                 boolean isContains = false;
                 for (final Client localClient : localClients) {
                     if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
@@ -303,7 +303,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             } else {
                 final List<Client> clients = new ArrayList<>();
                 clients.add(client);
-                eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
+                eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, clients);
             }
         }
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
index 4cc65a77b..5828b5248 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
@@ -84,7 +84,7 @@ public abstract class AbstractEventProcessor implements AsyncHttpProcessor {
 
             Map<String, String> metadata = new HashMap<>(1 << 4);
             for (Map.Entry<String, ConsumerGroupConf> consumerGroupMap :
-                    eventMeshHTTPServer.localConsumerGroupMapping.entrySet()) {
+                    eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().entrySet()) {
 
                 String consumerGroupKey = consumerGroupMap.getKey();
                 ConsumerGroupConf consumerGroupConf = consumerGroupMap.getValue();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org