You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/02 05:34:48 UTC

[incubator-eventmesh] branch master updated: fix issue2654

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a140e18b3 fix issue2654
     new 5d2de1609 Merge pull request #2772 from jonyangx/issue2654
a140e18b3 is described below

commit a140e18b3d5d8f2c7381af726d35a1d7f20c81cc
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Jan 1 09:27:52 2023 +0800

    fix issue2654
---
 .../http/consumer/HttpClientGroupMapping.java      |  48 ++---
 .../http/processor/HeartBeatProcessor.java         |  34 ++--
 .../processor/LocalSubscribeEventProcessor.java    | 224 ++++++++++-----------
 .../processor/LocalUnSubscribeEventProcessor.java  |  44 ++--
 .../http/processor/SubscribeProcessor.java         |  34 ++--
 .../http/processor/UnSubscribeProcessor.java       |  42 ++--
 .../core/protocol/http/processor/inf/Client.java   | 111 +++++++++-
 7 files changed, 310 insertions(+), 227 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 301cd1e6c..582c61d39 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -322,23 +322,23 @@ public class HttpClientGroupMapping {
                                       final List<SubscriptionItem> subscriptionItems, String url) {
         for (SubscriptionItem item : subscriptionItems) {
             Client client = new Client();
-            client.env = subscribeRequestHeader.getEnv();
-            client.idc = subscribeRequestHeader.getIdc();
-            client.sys = subscribeRequestHeader.getSys();
-            client.ip = subscribeRequestHeader.getIp();
-            client.pid = subscribeRequestHeader.getPid();
-            client.consumerGroup = consumerGroup;
-            client.topic = item.getTopic();
-            client.url = url;
-            client.lastUpTime = new Date();
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            client.setEnv(subscribeRequestHeader.getEnv());
+            client.setIdc(subscribeRequestHeader.getIdc());
+            client.setSys(subscribeRequestHeader.getSys());
+            client.setIp(subscribeRequestHeader.getIp());
+            client.setPid(subscribeRequestHeader.getPid());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(item.getTopic());
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
             if (localClientInfoMapping.containsKey(groupTopicKey)) {
                 List<Client> localClients = localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
@@ -381,25 +381,25 @@ public class HttpClientGroupMapping {
                                         final List<String> topicList, String url) {
         for (String topic : topicList) {
             Client client = new Client();
-            client.env = unSubscribeRequestHeader.getEnv();
-            client.idc = unSubscribeRequestHeader.getIdc();
-            client.sys = unSubscribeRequestHeader.getSys();
-            client.ip = unSubscribeRequestHeader.getIp();
-            client.pid = unSubscribeRequestHeader.getPid();
-            client.consumerGroup = consumerGroup;
-            client.topic = topic;
-            client.url = url;
-            client.lastUpTime = new Date();
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            client.setEnv(unSubscribeRequestHeader.getEnv());
+            client.setIdc(unSubscribeRequestHeader.getIdc());
+            client.setSys(unSubscribeRequestHeader.getSys());
+            client.setIp(unSubscribeRequestHeader.getIp());
+            client.setPid(unSubscribeRequestHeader.getPid());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(topic);
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
 
             if (localClientInfoMapping.containsKey(groupTopicKey)) {
                 List<Client> localClients =
                         localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 083f2167c..f439bcada 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -115,18 +115,18 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
         for (HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : heartbeatEntities) {
 
             Client client = new Client();
-            client.env = env;
-            client.idc = idc;
-            client.sys = sys;
-            client.ip = ip;
-            client.pid = pid;
-            client.consumerGroup = consumerGroup;
-            client.topic = heartbeatEntity.topic;
-            client.url = heartbeatEntity.url;
-
-            client.lastUpTime = new Date();
-
-            if (StringUtils.isBlank(client.topic)) {
+            client.setEnv(env);
+            client.setIdc(idc);
+            client.setSys(sys);
+            client.setIp(ip);
+            client.setPid(pid);
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(heartbeatEntity.topic);
+            client.setUrl(heartbeatEntity.url);
+
+            client.setLastUpTime(new Date());
+
+            if (StringUtils.isBlank(client.getTopic())) {
                 continue;
             }
 
@@ -137,7 +137,7 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
                 String pass = heartbeatRequestHeader.getPasswd();
                 int requestCode = Integer.parseInt(heartbeatRequestHeader.getCode());
                 try {
-                    Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode);
+                    Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.getTopic(), requestCode);
                 } catch (Exception e) {
                     responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                             heartbeatResponseHeader,
@@ -149,11 +149,11 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
                 }
             }
 
-            if (StringUtils.isBlank(client.url)) {
+            if (StringUtils.isBlank(client.getUrl())) {
                 continue;
             }
 
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
 
             if (tmp.containsKey(groupTopicKey)) {
                 tmp.get(groupTopicKey).add(client);
@@ -219,9 +219,9 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
         for (Client tmpClient : tmpClientList) {
             boolean isContains = false;
             for (Client localClient : localClientList) {
-                if (StringUtils.equals(localClient.url, tmpClient.url)) {
+                if (StringUtils.equals(localClient.getUrl(), tmpClient.getUrl())) {
                     isContains = true;
-                    localClient.lastUpTime = tmpClient.lastUpTime;
+                    localClient.setLastUpTime(tmpClient.getLastUpTime());
                     break;
                 }
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index 2b2ad88d5..9c93fefb7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -31,7 +31,6 @@ import org.apache.eventmesh.runtime.common.EventMeshTrace;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
-import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
@@ -48,12 +47,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.Channel;
 import io.netty.handler.codec.http.HttpRequest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -62,86 +60,85 @@ import com.fasterxml.jackson.core.type.TypeReference;
 @EventMeshTrace(isEnable = false)
 public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
 
-    public Logger httpLogger = LoggerFactory.getLogger("http");
+    private static final Logger LOGGER = LoggerFactory.getLogger(LocalSubscribeEventProcessor.class);
 
-    public Logger aclLogger = LoggerFactory.getLogger("acl");
-
-    public LocalSubscribeEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
+    public LocalSubscribeEventProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
         super(eventMeshHTTPServer);
     }
 
     @Override
-    public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest httpRequest) throws Exception {
-
-        AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
+    public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest)
+            throws Exception {
 
-        ChannelHandlerContext ctx = handlerSpecific.getCtx();
+        final Channel channel = handlerSpecific.getCtx().channel();
+        final HttpEventWrapper requestWrapper = handlerSpecific.getAsyncContext().getRequest();
 
-        HttpEventWrapper requestWrapper = asyncContext.getRequest();
-
-        httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
-            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
-        );
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
+                    EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channel),
+                    IPUtils.getLocalAddress());
+        }
 
         // user request header
-        Map<String, Object> userRequestHeaderMap = requestWrapper.getHeaderMap();
-        String requestIp = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-        userRequestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, requestIp);
-
+        requestWrapper.getHeaderMap().put(ProtocolKey.ClientInstanceKey.IP,
+                RemotingHelper.parseChannelRemoteAddr(channel));
         // build sys header
         requestWrapper.buildSysHeaderForClient();
 
-        Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
-
-        Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
-
-        Map<String, Object> responseBodyMap = new HashMap<>();
+        final Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
+        final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
+        final Map<String, Object> responseBodyMap = new HashMap<>();
 
         //validate header
         if (validateSysHeader(sysHeaderMap)) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
-                responseBodyMap, null);
+                    responseBodyMap, null);
             return;
         }
 
         //validate body
-        byte[] requestBody = requestWrapper.getBody();
-
-        Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
-            new String(requestBody, Constants.DEFAULT_CHARSET),
-            new TypeReference<HashMap<String, Object>>() {}
-        )).orElse(new HashMap<>());
+        final Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
+                new String(requestWrapper.getBody(), Constants.DEFAULT_CHARSET),
+                new TypeReference<HashMap<String, Object>>() {
+                }
+        )).orElseGet(HashMap::new);
 
-        if (requestBodyMap.get("url") == null || requestBodyMap.get("topic") == null || requestBodyMap.get("consumerGroup") == null) {
+        if (requestBodyMap.get("url") == null
+                || requestBodyMap.get("topic") == null
+                || requestBodyMap.get("consumerGroup") == null) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
-                responseBodyMap, null);
+                    responseBodyMap, null);
             return;
         }
 
-        String url = requestBodyMap.get("url").toString();
-        String consumerGroup = requestBodyMap.get("consumerGroup").toString();
-        String topic = JsonUtils.serialize(requestBodyMap.get("topic"));
+        final String url = requestBodyMap.get("url").toString();
+        final String consumerGroup = requestBodyMap.get("consumerGroup").toString();
+        final String topic = JsonUtils.serialize(requestBodyMap.get("topic"));
 
         // SubscriptionItem
-        List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
-            topic,
-            new TypeReference<List<SubscriptionItem>>() {}
-        )).orElse(Collections.emptyList());
+        final List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
+                topic,
+                new TypeReference<List<SubscriptionItem>>() {
+                }
+        )).orElseGet(Collections::emptyList);
 
         //do acl check
         if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
-            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            String user = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString();
-            String pass = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString();
-            String subsystem = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
-            for (SubscriptionItem item : subscriptionList) {
+            for (final SubscriptionItem item : subscriptionList) {
                 try {
-                    Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(),
-                        requestWrapper.getRequestURI());
+                    Acl.doAclCheckInHttpReceive(RemotingHelper.parseChannelRemoteAddr(channel),
+                            sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString(),
+                            sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString(),
+                            sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString(),
+                            item.getTopic(),
+                            requestWrapper.getRequestURI());
                 } catch (Exception e) {
-                    aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+                    if (LOGGER.isWarnEnabled()) {
+                        LOGGER.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+                    }
+
                     handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
-                        responseBodyMap, null);
+                            responseBodyMap, null);
                     return;
                 }
             }
@@ -150,27 +147,25 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
         // validate URL
         try {
             if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
-                httpLogger.error("subscriber url {} is not valid", url);
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
+                LOGGER.error("subscriber url {} is not valid", url);
                 handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
-                    responseBodyMap, null);
+                        responseBodyMap, null);
                 return;
             }
         } catch (Exception e) {
-            httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage());
+            LOGGER.error("subscriber url is invalid, url:" + url, e);
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
-                responseBodyMap, null);
+                    responseBodyMap, null);
             return;
         }
 
         // obtain webhook delivery agreement for Abuse Protection
-        boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
-            url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin());
-
-        if (!isWebhookAllowed) {
-            httpLogger.error("subscriber url {} is not allowed by the target system", url);
+        if (!WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
+                url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin())) {
+            LOGGER.error("subscriber url {} is not allowed by the target system", url);
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
-                responseBodyMap, null);
+                    responseBodyMap, null);
             return;
         }
 
@@ -178,47 +173,47 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
 
             registerClient(requestWrapper, consumerGroup, subscriptionList, url);
 
-            for (SubscriptionItem subTopic : subscriptionList) {
-                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
-                    .get(consumerGroup + "@" + subTopic.getTopic());
+            for (final SubscriptionItem subTopic : subscriptionList) {
+                final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+                        .get(consumerGroup + "@" + subTopic.getTopic());
 
                 if (CollectionUtils.isEmpty(groupTopicClients)) {
-                    httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
+                    LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
                 }
 
-                Map<String, List<String>> idcUrls = new HashMap<>();
-                for (Client client : groupTopicClients) {
-                    if (idcUrls.containsKey(client.idc)) {
-                        idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+                final Map<String, List<String>> idcUrls = new HashMap<>();
+                for (final Client client : groupTopicClients) {
+                    if (idcUrls.containsKey(client.getIdc())) {
+                        idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
                     } else {
-                        List<String> urls = new ArrayList<>();
-                        urls.add(client.url);
-                        idcUrls.put(client.idc, urls);
+                        final List<String> urls = new ArrayList<>();
+                        urls.add(client.getUrl());
+                        idcUrls.put(client.getIdc(), urls);
                     }
                 }
+
                 ConsumerGroupConf consumerGroupConf =
-                    eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
                 if (consumerGroupConf == null) {
                     // new subscription
                     consumerGroupConf = new ConsumerGroupConf(consumerGroup);
-                    ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
+                    final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
                     consumeTopicConfig.setConsumerGroup(consumerGroup);
                     consumeTopicConfig.setTopic(subTopic.getTopic());
                     consumeTopicConfig.setSubscriptionItem(subTopic);
                     consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
-
                     consumeTopicConfig.setIdcUrls(idcUrls);
 
-                    Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
+                    final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
                     map.put(subTopic.getTopic(), consumeTopicConfig);
                     consumerGroupConf.setConsumerGroupTopicConf(map);
                 } else {
                     // already subscribed
-                    Map<String, ConsumerGroupTopicConf> map =
-                        consumerGroupConf.getConsumerGroupTopicConf();
+                    final Map<String, ConsumerGroupTopicConf> map =
+                            consumerGroupConf.getConsumerGroupTopicConf();
                     if (!map.containsKey(subTopic.getTopic())) {
                         //If there are multiple topics, append it
-                        ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
+                        final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
                         newTopicConf.setConsumerGroup(consumerGroup);
                         newTopicConf.setTopic(subTopic.getTopic());
                         newTopicConf.setSubscriptionItem(subTopic);
@@ -226,19 +221,19 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                         newTopicConf.setIdcUrls(idcUrls);
                         map.put(subTopic.getTopic(), newTopicConf);
                     }
-                    Set<Map.Entry<String, ConsumerGroupTopicConf>> entrySet = map.entrySet();
-                    for (Map.Entry<String, ConsumerGroupTopicConf> set : entrySet) {
+
+                    for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
                         if (!StringUtils.equals(subTopic.getTopic(), set.getKey())) {
                             continue;
                         }
 
-                        ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+                        final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                         latestTopicConf.setConsumerGroup(consumerGroup);
                         latestTopicConf.setTopic(subTopic.getTopic());
                         latestTopicConf.setSubscriptionItem(subTopic);
                         latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
 
-                        ConsumerGroupTopicConf currentTopicConf = set.getValue();
+                        final ConsumerGroupTopicConf currentTopicConf = set.getValue();
                         latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
                         latestTopicConf.setIdcUrls(idcUrls);
 
@@ -248,24 +243,22 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                 eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
             }
 
-            long startTime = System.currentTimeMillis();
+            final long startTime = System.currentTimeMillis();
             try {
                 // subscription relationship change notification
                 eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
-                    eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
-
+                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
                 responseBodyMap.put("retCode", EventMeshRetCode.SUCCESS.getRetCode());
                 responseBodyMap.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg());
 
                 handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
 
             } catch (Exception e) {
-                long endTime = System.currentTimeMillis();
-                httpLogger.error(
-                    "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
-                        + "|bizSeqNo={}|uniqueId={}", endTime - startTime,
-                    JsonUtils.serialize(subscriptionList), url, e);
-                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null);
+                LOGGER.error(String.format("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%s ms|topic=%s"
+                                + "|uniqueId=%s", System.currentTimeMillis() - startTime,
+                        JsonUtils.serialize(subscriptionList), url), e);
+                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap,
+                        responseBodyMap, null);
             }
 
             // Update service metadata
@@ -276,42 +269,43 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
+        return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
     }
 
-    private void registerClient(HttpEventWrapper requestWrapper, String consumerGroup,
-                                List<SubscriptionItem> subscriptionItems, String url) {
-        Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
-        for (SubscriptionItem item : subscriptionItems) {
-            Client client = new Client();
-            client.env = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString();
-            client.idc = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString();
-            client.sys = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
-            client.ip = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString();
-            client.pid = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
-            client.consumerGroup = consumerGroup;
-            client.topic = item.getTopic();
-            client.url = url;
-            client.lastUpTime = new Date();
-
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+    private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup,
+                                final List<SubscriptionItem> subscriptionItems, final String url) {
+        final Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
+        for (final SubscriptionItem item : subscriptionItems) {
+            final Client client = new Client();
+            client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+            client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+            client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+            client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+            client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(item.getTopic());
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
+
+            final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
 
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
-                List<Client> localClients =
-                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+                final List<Client> localClients =
+                        eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
-                for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                for (final Client localClient : localClients) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
+
                 if (!isContains) {
                     localClients.add(client);
                 }
             } else {
-                List<Client> clients = new ArrayList<>();
+                final List<Client> clients = new ArrayList<>();
                 clients.add(client);
                 eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index 4bdc7f70c..8e901b89e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -142,8 +142,8 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
                 Iterator<Client> clientIterator = groupTopicClients.iterator();
                 while (clientIterator.hasNext()) {
                     Client client = clientIterator.next();
-                    if (StringUtils.equals(client.pid, pid)
-                        && StringUtils.equals(client.url, unSubscribeUrl)) {
+                    if (StringUtils.equals(client.getPid(), pid)
+                        && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
                         httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
                         clientIterator.remove();
                     }
@@ -154,15 +154,15 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
                     Set<String> clientUrls = new HashSet<>();
                     for (Client client : groupTopicClients) {
                         // remove subscribed url
-                        if (!StringUtils.equals(unSubscribeUrl, client.url)) {
-                            clientUrls.add(client.url);
-                            if (idcUrls.containsKey(client.idc)) {
-                                idcUrls.get(client.idc)
-                                    .add(StringUtils.deleteWhitespace(client.url));
+                        if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
+                            clientUrls.add(client.getUrl());
+                            if (idcUrls.containsKey(client.getIdc())) {
+                                idcUrls.get(client.getIdc())
+                                    .add(StringUtils.deleteWhitespace(client.getUrl()));
                             } else {
                                 List<String> urls = new ArrayList<>();
-                                urls.add(client.url);
-                                idcUrls.put(client.idc, urls);
+                                urls.add(client.getUrl());
+                                idcUrls.put(client.getIdc(), urls);
                             }
                         }
 
@@ -260,25 +260,25 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
         Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
         for (String topic : topicList) {
             Client client = new Client();
-            client.env = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString();
-            client.idc = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString();
-            client.sys = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
-            client.ip = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString();
-            client.pid = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
-            client.consumerGroup = consumerGroup;
-            client.topic = topic;
-            client.url = url;
-            client.lastUpTime = new Date();
-
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+            client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+            client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+            client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+            client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(topic);
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
+
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
                 List<Client> localClients =
                     eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 6d6f54064..ef0d5ad37 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -202,12 +202,12 @@ public class SubscribeProcessor implements HttpRequestProcessor {
 
                 Map<String, List<String>> idcUrls = new HashMap<>();
                 for (Client client : groupTopicClients) {
-                    if (idcUrls.containsKey(client.idc)) {
-                        idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+                    if (idcUrls.containsKey(client.getIdc())) {
+                        idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
                     } else {
                         List<String> urls = new ArrayList<>();
-                        urls.add(client.url);
-                        idcUrls.put(client.idc, urls);
+                        urls.add(client.getUrl());
+                        idcUrls.put(client.getIdc(), urls);
                     }
                 }
                 ConsumerGroupConf consumerGroupConf =
@@ -312,26 +312,26 @@ public class SubscribeProcessor implements HttpRequestProcessor {
                                 List<SubscriptionItem> subscriptionItems, String url) {
         for (SubscriptionItem item : subscriptionItems) {
             Client client = new Client();
-            client.env = subscribeRequestHeader.getEnv();
-            client.idc = subscribeRequestHeader.getIdc();
-            client.sys = subscribeRequestHeader.getSys();
-            client.ip = subscribeRequestHeader.getIp();
-            client.pid = subscribeRequestHeader.getPid();
-            client.consumerGroup = consumerGroup;
-            client.topic = item.getTopic();
-            client.url = url;
-            client.lastUpTime = new Date();
-
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            client.setEnv(subscribeRequestHeader.getEnv());
+            client.setIdc(subscribeRequestHeader.getIdc());
+            client.setSys(subscribeRequestHeader.getSys());
+            client.setIp(subscribeRequestHeader.getIp());
+            client.setPid(subscribeRequestHeader.getPid());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(item.getTopic());
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
+
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
 
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
                 List<Client> localClients =
                     eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index d428df902..419f28d22 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -148,8 +148,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                 Iterator<Client> clientIterator = groupTopicClients.iterator();
                 while (clientIterator.hasNext()) {
                     Client client = clientIterator.next();
-                    if (StringUtils.equals(client.pid, pid)
-                            && StringUtils.equals(client.url, unSubscribeUrl)) {
+                    if (StringUtils.equals(client.getPid(), pid)
+                            && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
                         httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
                         clientIterator.remove();
                     }
@@ -160,15 +160,15 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                     Set<String> clientUrls = new HashSet<>();
                     for (Client client : groupTopicClients) {
                         // remove subscribed url
-                        if (!StringUtils.equals(unSubscribeUrl, client.url)) {
-                            clientUrls.add(client.url);
-                            if (idcUrls.containsKey(client.idc)) {
-                                idcUrls.get(client.idc)
-                                        .add(StringUtils.deleteWhitespace(client.url));
+                        if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
+                            clientUrls.add(client.getUrl());
+                            if (idcUrls.containsKey(client.getIdc())) {
+                                idcUrls.get(client.getIdc())
+                                        .add(StringUtils.deleteWhitespace(client.getUrl()));
                             } else {
                                 List<String> urls = new ArrayList<>();
-                                urls.add(client.url);
-                                idcUrls.put(client.idc, urls);
+                                urls.add(client.getUrl());
+                                idcUrls.put(client.getIdc(), urls);
                             }
                         }
 
@@ -275,25 +275,25 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                                 List<String> topicList, String url) {
         for (String topic : topicList) {
             Client client = new Client();
-            client.env = unSubscribeRequestHeader.getEnv();
-            client.idc = unSubscribeRequestHeader.getIdc();
-            client.sys = unSubscribeRequestHeader.getSys();
-            client.ip = unSubscribeRequestHeader.getIp();
-            client.pid = unSubscribeRequestHeader.getPid();
-            client.consumerGroup = consumerGroup;
-            client.topic = topic;
-            client.url = url;
-            client.lastUpTime = new Date();
+            client.setEnv(unSubscribeRequestHeader.getEnv());
+            client.setIdc(unSubscribeRequestHeader.getIdc());
+            client.setSys(unSubscribeRequestHeader.getSys());
+            client.setIp(unSubscribeRequestHeader.getIp());
+            client.setPid(unSubscribeRequestHeader.getPid());
+            client.setConsumerGroup(consumerGroup);
+            client.setTopic(topic);
+            client.setUrl(url);
+            client.setLastUpTime(new Date());
 
-            String groupTopicKey = client.consumerGroup + "@" + client.topic;
+            String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
                 List<Client> localClients =
                         eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
-                    if (StringUtils.equals(localClient.url, client.url)) {
+                    if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
                         isContains = true;
-                        localClient.lastUpTime = client.lastUpTime;
+                        localClient.setLastUpTime(client.getLastUpTime());
                         break;
                     }
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
index 119c1dea2..2abf68817 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
@@ -23,27 +23,116 @@ import java.util.Date;
 
 public class Client {
 
-    public String env;
+    private String env;
 
-    public String idc;
+    private String idc;
 
-    public String consumerGroup;
+    private String consumerGroup;
 
-    public String topic;
+    private String topic;
 
-    public String url;
+    private String url;
 
-    public String sys;
+    private String sys;
 
-    public String ip;
+    private String ip;
 
-    public String pid;
+    private String pid;
 
-    public String hostname;
+    private String hostname;
 
-    public String apiVersion;
+    private Date lastUpTime;
+
+    public void setEnv(String env) {
+        this.env = env;
+    }
+
+    public void setIdc(String idc) {
+        this.idc = idc;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public void setSys(String sys) {
+        this.sys = sys;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public void setPid(String pid) {
+        this.pid = pid;
+    }
+
+    public void setHostname(String hostname) {
+        this.hostname = hostname;
+    }
+
+    public void setApiVersion(String apiVersion) {
+        this.apiVersion = apiVersion;
+    }
+
+    public void setLastUpTime(Date lastUpTime) {
+        this.lastUpTime = lastUpTime;
+    }
+
+    private String apiVersion;
+
+    public String getEnv() {
+        return env;
+    }
+
+    public String getIdc() {
+        return idc;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public String getSys() {
+        return sys;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public String getPid() {
+        return pid;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public String getApiVersion() {
+        return apiVersion;
+    }
+
+    public Date getLastUpTime() {
+        return lastUpTime;
+    }
 
-    public Date lastUpTime;
 
     @Override
     public String toString() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org