You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/08 23:51:35 UTC
[incubator-eventmesh] branch master updated: [ISSUE #285] add SubscriptionManager (#2856)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new d6ae87ad3 [ISSUE #285] add SubscriptionManager (#2856)
d6ae87ad3 is described below
commit d6ae87ad3284c2cb17012cec2745e6653fe6cf22
Author: wqliang <wq...@users.noreply.github.com>
AuthorDate: Mon Jan 9 07:51:29 2023 +0800
[ISSUE #285] add SubscriptionManager (#2856)
* add SubscriptionManager
* register client in SubscriptionManager
* update subscription and urls by SubscriptionManager
* code reactor: new Client obj only when need
* format code style
* format code style
---
.../runtime/admin/handler/HTTPClientHandler.java | 4 +-
.../runtime/boot/EventMeshHTTPServer.java | 16 +--
.../runtime/core/consumer/ClientInfo.java | 70 +++++++++++
.../runtime/core/consumer/SubscriptionManager.java | 132 +++++++++++++++++++
.../core/consumergroup/ConsumerGroupConf.java | 8 +-
.../http/consumer/HttpClientGroupMapping.java | 4 +-
.../http/processor/HeartBeatProcessor.java | 8 +-
.../processor/LocalSubscribeEventProcessor.java | 139 +++------------------
.../processor/LocalUnSubscribeEventProcessor.java | 20 +--
.../http/processor/SubscribeProcessor.java | 138 +++-----------------
.../http/processor/UnSubscribeProcessor.java | 22 ++--
.../http/processor/inf/AbstractEventProcessor.java | 2 +-
12 files changed, 275 insertions(+), 288 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
index e80107a7f..3ca9d5c55 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java
@@ -77,7 +77,7 @@ public class HTTPClientHandler implements HttpHandler {
DeleteHTTPClientRequest deleteHTTPClientRequest = JsonUtils.toObject(request, DeleteHTTPClientRequest.class);
String url = deleteHTTPClientRequest.url;
- for (List<Client> clientList : eventMeshHTTPServer.localClientInfoMapping.values()) {
+ for (List<Client> clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) {
clientList.removeIf(client -> Objects.equals(client.getUrl(), url));
}
@@ -118,7 +118,7 @@ public class HTTPClientHandler implements HttpHandler {
// Get the list of HTTP clients
List<GetClientResponse> getClientResponseList = new ArrayList<>();
- for (List<Client> clientList : eventMeshHTTPServer.localClientInfoMapping.values()) {
+ for (List<Client> clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) {
for (Client client : clientList) {
GetClientResponse getClientResponse = new GetClientResponse(
Optional.ofNullable(client.getEnv()).orElse(""),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 3c1b21688..ec375a182 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -29,7 +29,7 @@ import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
@@ -86,6 +86,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private transient ConsumerManager consumerManager;
+ private transient SubscriptionManager subscriptionManager;
+
private transient ProducerManager producerManager;
private transient HttpRetryer httpRetryer;
@@ -112,12 +114,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
public transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
- public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
- new ConcurrentHashMap<>();
-
- public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
- new ConcurrentHashMap<>();
-
public EventMeshHTTPServer(final EventMeshServer eventMeshServer,
final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
super(eventMeshHttpConfiguration.httpServerPort, eventMeshHttpConfiguration.eventMeshServerUseTls, eventMeshHttpConfiguration);
@@ -246,6 +242,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
this.setMetrics(new HTTPMetricsServer(this, metricsRegistries));
+ subscriptionManager = new SubscriptionManager();
+
consumerManager = new ConsumerManager(this);
consumerManager.init();
@@ -409,6 +407,10 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
this.getHandlerService().register(webHookProcessor, webhookExecutor);
}
+ public SubscriptionManager getSubscriptionManager() {
+ return subscriptionManager;
+ }
+
public ConsumerManager getConsumerManager() {
return consumerManager;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java
new file mode 100644
index 000000000..60be5a1f1
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.consumer;
+
+public class ClientInfo {
+ private String env;
+
+ private String idc;
+
+ private String sys;
+
+ private String pid;
+
+ private String ip;
+
+ public String getEnv() {
+ return env;
+ }
+
+ public void setEnv(String env) {
+ this.env = env;
+ }
+
+ public String getIdc() {
+ return idc;
+ }
+
+ public void setIdc(String idc) {
+ this.idc = idc;
+ }
+
+ public String getSys() {
+ return sys;
+ }
+
+ public void setSys(String sys) {
+ this.sys = sys;
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
new file mode 100644
index 000000000..04f7b8c57
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.consumer;
+
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionManager {
+ private static final Logger logger = LoggerFactory.getLogger(SubscriptionManager.class);
+ private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
+ new ConcurrentHashMap<>();
+
+ public ConcurrentHashMap<String, ConsumerGroupConf> getLocalConsumerGroupMapping() {
+ return localConsumerGroupMapping;
+ }
+
+ public ConcurrentHashMap<String, List<Client>> getLocalClientInfoMapping() {
+ return localClientInfoMapping;
+ }
+
+ public void registerClient(final ClientInfo clientInfo, final String consumerGroup,
+ final List<SubscriptionItem> subscriptionItems, final String url) {
+ for (final SubscriptionItem subscription : subscriptionItems) {
+ final String groupTopicKey = consumerGroup + "@" + subscription.getTopic();
+
+ List<Client> localClients = localClientInfoMapping.get(groupTopicKey);
+
+ if (localClients == null) {
+ localClientInfoMapping.putIfAbsent(groupTopicKey, new ArrayList<>());
+ localClients = localClientInfoMapping.get(groupTopicKey);
+ }
+
+ boolean isContains = false;
+ for (final Client localClient : localClients) {
+ //TODO: compare the whole Client would be better?
+ if (StringUtils.equals(localClient.getUrl(), url)) {
+ isContains = true;
+ localClient.setLastUpTime(new Date());
+ break;
+ }
+ }
+
+ if (!isContains) {
+ Client client = new Client();
+ client.setEnv(clientInfo.getEnv());
+ client.setIdc(clientInfo.getIdc());
+ client.setSys(clientInfo.getSys());
+ client.setIp(clientInfo.getIp());
+ client.setPid(clientInfo.getPid());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(subscription.getTopic());
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+ localClients.add(client);
+ }
+ }
+ }
+
+ public void updateSubscription(ClientInfo clientInfo, String consumerGroup,
+ String url, List<SubscriptionItem> subscriptionList) {
+ for (final SubscriptionItem subscription : subscriptionList) {
+ final List<Client> groupTopicClients = localClientInfoMapping
+ .get(consumerGroup + "@" + subscription.getTopic());
+
+ if (CollectionUtils.isEmpty(groupTopicClients)) {
+ logger.error("group {} topic {} clients is empty", consumerGroup, subscription);
+ }
+
+ ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
+ if (consumerGroupConf == null) {
+ // new subscription
+ ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup));
+ if (prev == null) {
+ logger.info("add new subscription, consumer group: {}", consumerGroup);
+ }
+ consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
+ }
+
+ ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf()
+ .get(subscription.getTopic());
+ if (consumerGroupTopicConf == null) {
+ consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> {
+ ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
+ newTopicConf.setConsumerGroup(consumerGroup);
+ newTopicConf.setTopic(topic);
+ newTopicConf.setSubscriptionItem(subscription);
+ logger.info("add new {}", newTopicConf);
+ return newTopicConf;
+ });
+ consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic());
+ }
+
+ consumerGroupTopicConf.getUrls().add(url);
+ if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {
+ consumerGroupTopicConf.getIdcUrls().putIfAbsent(clientInfo.getIdc(), new ArrayList<>());
+ }
+ //TODO: idcUrl list is not thread-safe
+ consumerGroupTopicConf.getIdcUrls().get(clientInfo.getIdc()).add(url);
+ }
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
index 2c44b5958..289256c76 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.core.consumergroup;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Maps;
@@ -27,7 +28,8 @@ public class ConsumerGroupConf implements Serializable {
//eg . 5013-1A0
private String consumerGroup;
- private Map<String, ConsumerGroupTopicConf> consumerGroupTopicConf = Maps.newConcurrentMap();
+ private final ConcurrentHashMap<String/*topic*/, ConsumerGroupTopicConf> consumerGroupTopicConf
+ = new ConcurrentHashMap<String, ConsumerGroupTopicConf>();
public ConsumerGroupConf(String consumerGroup) {
this.consumerGroup = consumerGroup;
@@ -45,10 +47,6 @@ public class ConsumerGroupConf implements Serializable {
return consumerGroupTopicConf;
}
- public void setConsumerGroupTopicConf(Map<String, ConsumerGroupTopicConf> consumerGroupTopicConf) {
- this.consumerGroupTopicConf = consumerGroupTopicConf;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 43103672d..a0b2186be 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -229,9 +229,7 @@ public final class HttpClientGroupMapping {
urls.add(url);
idcUrls.put(clientIdc, urls);
consumeTopicConfig.setIdcUrls(idcUrls);
- final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
- map.put(subTopic.getTopic(), consumeTopicConfig);
- consumerGroupConf.setConsumerGroupTopicConf(map);
+ consumerGroupConf.getConsumerGroupTopicConf().put(subTopic.getTopic(), consumeTopicConfig);
localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
isChange = true;
} else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index a606f8e04..3bd84f055 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -162,17 +162,17 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
}
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
for (final Map.Entry<String, List<Client>> groupTopicClientMapping : tmpMap.entrySet()) {
final List<Client> localClientList =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey());
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicClientMapping.getKey());
if (CollectionUtils.isEmpty(localClientList)) {
- eventMeshHTTPServer.localClientInfoMapping
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
} else {
final List<Client> tmpClientList = groupTopicClientMapping.getValue();
supplyClientInfoList(tmpClientList, localClientList);
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), localClientList);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicClientMapping.getKey(), localClientList);
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index afc2cad28..3a2e57845 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -29,21 +29,14 @@ import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.WebhookUtil;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -173,87 +166,17 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
return;
}
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-
- registerClient(requestWrapper, consumerGroup, subscriptionList, url);
-
- for (final SubscriptionItem subTopic : subscriptionList) {
- final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
- .get(consumerGroup + "@" + subTopic.getTopic());
-
- if (CollectionUtils.isEmpty(groupTopicClients)) {
- if (log.isErrorEnabled()) {
- log.error("group {} topic {} clients is empty", consumerGroup, subTopic);
- }
- }
-
- final Map<String, List<String>> idcUrls = new HashMap<>();
- for (final Client client : groupTopicClients) {
- if (idcUrls.containsKey(client.getIdc())) {
- idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
- } else {
- final List<String> urls = new ArrayList<>();
- urls.add(client.getUrl());
- idcUrls.put(client.getIdc(), urls);
- }
- }
-
- ConsumerGroupConf consumerGroupConf =
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
- if (consumerGroupConf == null) {
- // new subscription
- consumerGroupConf = new ConsumerGroupConf(consumerGroup);
- final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
- consumeTopicConfig.setConsumerGroup(consumerGroup);
- consumeTopicConfig.setTopic(subTopic.getTopic());
- consumeTopicConfig.setSubscriptionItem(subTopic);
- consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
- consumeTopicConfig.setIdcUrls(idcUrls);
-
- final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
- map.put(subTopic.getTopic(), consumeTopicConfig);
- consumerGroupConf.setConsumerGroupTopicConf(map);
- } else {
- // already subscribed
- final Map<String, ConsumerGroupTopicConf> map =
- consumerGroupConf.getConsumerGroupTopicConf();
- if (!map.containsKey(subTopic.getTopic())) {
- //If there are multiple topics, append it
- final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
- newTopicConf.setConsumerGroup(consumerGroup);
- newTopicConf.setTopic(subTopic.getTopic());
- newTopicConf.setSubscriptionItem(subTopic);
- newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
- newTopicConf.setIdcUrls(idcUrls);
- map.put(subTopic.getTopic(), newTopicConf);
- }
-
- for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
- if (!StringUtils.equals(subTopic.getTopic(), set.getKey())) {
- continue;
- }
-
- final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
- latestTopicConf.setConsumerGroup(consumerGroup);
- latestTopicConf.setTopic(subTopic.getTopic());
- latestTopicConf.setSubscriptionItem(subTopic);
- latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-
- final ConsumerGroupTopicConf currentTopicConf = set.getValue();
- latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
- latestTopicConf.setIdcUrls(idcUrls);
-
- map.put(set.getKey(), latestTopicConf);
- }
- }
- eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
- }
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
+ ClientInfo clientInfo = getClientInfo(requestWrapper);
+ SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
+ subscriptionManager.registerClient(clientInfo, consumerGroup, subscriptionList, url);
+ subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subscriptionList);
final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseBodyMap.put("retCode", EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg());
@@ -282,44 +205,14 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
}
- private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup,
- final List<SubscriptionItem> subscriptionItems, final String url) {
+ private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) {
final Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
- for (final SubscriptionItem item : subscriptionItems) {
- final Client client = new Client();
- client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
- client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
- client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
- client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
- client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
- client.setConsumerGroup(consumerGroup);
- client.setTopic(item.getTopic());
- client.setUrl(url);
- client.setLastUpTime(new Date());
-
- final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
-
- List<Client> localClients =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
- if (localClients == null) {
- localClients = new ArrayList<>();
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
- }
-
- boolean isContains = false;
- for (final Client localClient : localClients) {
- if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
- isContains = true;
- localClient.setLastUpTime(client.getLastUpTime());
- break;
- }
- }
-
- if (!isContains) {
- localClients.add(client);
- }
-
- }
+ ClientInfo clientInfo = new ClientInfo();
+ clientInfo.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+ clientInfo.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+ clientInfo.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+ clientInfo.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+ clientInfo.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+ return clientInfo;
}
-
}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index ae703436f..1810b842b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -127,13 +127,13 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
final String pid = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
boolean isChange = true;
registerClient(requestWrapper, consumerGroup, unSubTopicList, unSubscribeUrl);
for (final String unSubTopic : unSubTopicList) {
- final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+ final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
.get(consumerGroup + "@" + unSubTopic);
final Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
@@ -167,9 +167,9 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
}
- synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
final ConsumerGroupConf consumerGroupConf =
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
final Map<String, ConsumerGroupTopicConf> map =
consumerGroupConf.getConsumerGroupTopicConf();
for (final Map.Entry<String, ConsumerGroupTopicConf> entry : map.entrySet()) {
@@ -184,7 +184,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
map.put(unSubTopic, latestTopicConf);
}
}
- eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().put(consumerGroup, consumerGroupConf);
}
} else {
isChange = false;
@@ -195,7 +195,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
if (isChange) {
try {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());
@@ -221,10 +221,10 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
// clean ClientInfo
- eventMeshHTTPServer.localClientInfoMapping.keySet()
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
.removeIf(s -> StringUtils.contains(s, consumerGroup));
// clean ConsumerGroupInfo
- eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
if (log.isErrorEnabled()) {
@@ -272,11 +272,11 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
List<Client> localClients =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey);
if (localClients == null) {
localClients = new ArrayList<>();
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, localClients);
}
boolean isContains = false;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index f1935815a..3f6c24387 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -31,11 +31,10 @@ import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
+import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
@@ -44,13 +43,7 @@ import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,85 +182,17 @@ public class SubscribeProcessor implements HttpRequestProcessor {
return;
}
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-
- registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
-
- for (final SubscriptionItem subTopic : subTopicList) {
- final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
- .get(consumerGroup + "@" + subTopic.getTopic());
-
- if (CollectionUtils.isEmpty(groupTopicClients)) {
- LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
- }
-
- final Map<String, List<String>> idcUrls = new HashMap<>();
- for (final Client client : groupTopicClients) {
- List<String> urls = idcUrls.get(client.getIdc());
- if (urls == null) {
- urls = new ArrayList<>();
- idcUrls.put(client.getIdc(), urls);
- }
- urls.add(StringUtils.deleteWhitespace(client.getUrl()));
- }
-
- ConsumerGroupConf consumerGroupConf =
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
-
- if (consumerGroupConf == null) {
- // new subscription
- consumerGroupConf = new ConsumerGroupConf(consumerGroup);
- final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
- consumeTopicConfig.setConsumerGroup(consumerGroup);
- consumeTopicConfig.setTopic(subTopic.getTopic());
- consumeTopicConfig.setSubscriptionItem(subTopic);
- consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
-
- consumeTopicConfig.setIdcUrls(idcUrls);
-
- final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
- map.put(subTopic.getTopic(), consumeTopicConfig);
- consumerGroupConf.setConsumerGroupTopicConf(map);
- } else {
- // already subscribed
- final Map<String, ConsumerGroupTopicConf> map =
- consumerGroupConf.getConsumerGroupTopicConf();
-
- if (!map.containsKey(subTopic.getTopic())) {
- //If there are multiple topics, append it
- final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
- newTopicConf.setConsumerGroup(consumerGroup);
- newTopicConf.setTopic(subTopic.getTopic());
- newTopicConf.setSubscriptionItem(subTopic);
- newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
- newTopicConf.setIdcUrls(idcUrls);
- map.put(subTopic.getTopic(), newTopicConf);
- }
-
- for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
- if (StringUtils.equals(subTopic.getTopic(), set.getKey())) {
- final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
- latestTopicConf.setConsumerGroup(consumerGroup);
- latestTopicConf.setTopic(subTopic.getTopic());
- latestTopicConf.setSubscriptionItem(subTopic);
- latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
-
- final ConsumerGroupTopicConf currentTopicConf = set.getValue();
- latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
- latestTopicConf.setIdcUrls(idcUrls);
-
- map.put(set.getKey(), latestTopicConf);
- }
- }
- }
- eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
- }
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
+ ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
+ SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
+ subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
+ subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);
final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
@Override
@@ -316,44 +241,13 @@ public class SubscribeProcessor implements HttpRequestProcessor {
return false;
}
- private void registerClient(final SubscribeRequestHeader subscribeRequestHeader, final String consumerGroup,
- final List<SubscriptionItem> subscriptionItems, final String url) {
- for (final SubscriptionItem item : subscriptionItems) {
- final Client client = new Client();
- client.setEnv(subscribeRequestHeader.getEnv());
- client.setIdc(subscribeRequestHeader.getIdc());
- client.setSys(subscribeRequestHeader.getSys());
- client.setIp(subscribeRequestHeader.getIp());
- client.setPid(subscribeRequestHeader.getPid());
- client.setConsumerGroup(consumerGroup);
- client.setTopic(item.getTopic());
- client.setUrl(url);
- client.setLastUpTime(new Date());
-
- final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
-
- List<Client> localClients =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
-
- if (localClients == null) {
- localClients = new ArrayList<>();
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients);
- }
-
- boolean isContains = false;
- for (final Client localClient : localClients) {
- if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
- isContains = true;
- localClient.setLastUpTime(client.getLastUpTime());
- break;
- }
- }
-
- if (!isContains) {
- localClients.add(client);
- }
-
- }
+ private ClientInfo getClientInfo(final SubscribeRequestHeader subscribeRequestHeader) {
+ ClientInfo clientInfo = new ClientInfo();
+ clientInfo.setEnv(subscribeRequestHeader.getEnv());
+ clientInfo.setIdc(subscribeRequestHeader.getIdc());
+ clientInfo.setSys(subscribeRequestHeader.getSys());
+ clientInfo.setIp(subscribeRequestHeader.getIp());
+ clientInfo.setPid(subscribeRequestHeader.getPid());
+ return clientInfo;
}
-
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index 3f6e6c18d..0e6049d62 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -136,13 +136,13 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
}
};
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
boolean isChange = true;
registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);
for (final String unSubTopic : unSubTopicList) {
- final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+ final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
.get(consumerGroup + "@" + unSubTopic);
final Iterator<Client> clientIterator = groupTopicClients.iterator();
@@ -173,9 +173,9 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
}
}
- synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
+ synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
final ConsumerGroupConf consumerGroupConf =
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
final Map<String, ConsumerGroupTopicConf> map =
consumerGroupConf.getConsumerGroupTopicConf();
@@ -195,7 +195,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
map.put(unSubTopic, latestTopicConf);
}
}
- eventMeshHTTPServer.localConsumerGroupMapping
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()
.put(consumerGroup, consumerGroupConf);
}
} else {
@@ -208,7 +208,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
if (isChange) {
try {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseEventMeshCommand =
asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
@@ -239,10 +239,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
// clean ClientInfo
- eventMeshHTTPServer.localClientInfoMapping.keySet()
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
.removeIf(s -> StringUtils.contains(s, consumerGroup));
// clean ConsumerGroupInfo
- eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
@@ -286,9 +286,9 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
client.setLastUpTime(new Date());
final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
- if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
+ if (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().containsKey(groupTopicKey)) {
final List<Client> localClients =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey);
boolean isContains = false;
for (final Client localClient : localClients) {
if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
@@ -303,7 +303,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
} else {
final List<Client> clients = new ArrayList<>();
clients.add(client);
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
+ eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, clients);
}
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
index 4cc65a77b..5828b5248 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
@@ -84,7 +84,7 @@ public abstract class AbstractEventProcessor implements AsyncHttpProcessor {
Map<String, String> metadata = new HashMap<>(1 << 4);
for (Map.Entry<String, ConsumerGroupConf> consumerGroupMap :
- eventMeshHTTPServer.localConsumerGroupMapping.entrySet()) {
+ eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().entrySet()) {
String consumerGroupKey = consumerGroupMap.getKey();
ConsumerGroupConf consumerGroupConf = consumerGroupMap.getValue();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org