You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/05 20:17:24 UTC

[incubator-eventmesh] branch master updated: [ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new e808aa269 [ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)
e808aa269 is described below

commit e808aa269b4a9ebe65a47531ae8fd3368aba1c1d
Author: weihubeats <we...@163.com>
AuthorDate: Fri Jan 6 04:17:18 2023 +0800

    [ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)
    
    * Simplify the code
    
    * Simplify the code
---
 .../common/config/CommonConfiguration.java         |   4 +
 .../processor/LocalSubscribeEventProcessor.java    |  44 +++++----
 .../processor/LocalUnSubscribeEventProcessor.java  |   4 +-
 .../processor/RemoteSubscribeEventProcessor.java   | 100 ++++-----------------
 .../processor/RemoteUnSubscribeEventProcessor.java |  70 ++-------------
 .../http/processor/inf/AbstractEventProcessor.java |  93 +++++++++++++++++++
 6 files changed, 144 insertions(+), 171 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 62d2330c3..e0ed6960b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -67,6 +67,10 @@ public class CommonConfiguration {
     public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
         this.configurationWrapper = configurationWrapper;
     }
+
+    public String getMeshGroup() {
+        return String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID);
+    }
     
 
     public void init() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index 5b3fe361b..afc2cad28 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -48,20 +48,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.netty.channel.Channel;
 import io.netty.handler.codec.http.HttpRequest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
+import lombok.extern.slf4j.Slf4j;
 
-@EventMeshTrace(isEnable = false)
-public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(LocalSubscribeEventProcessor.class);
 
+@EventMeshTrace
+@Slf4j
+public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
+    
     public LocalSubscribeEventProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
         super(eventMeshHTTPServer);
     }
@@ -73,8 +71,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
         final Channel channel = handlerSpecific.getCtx().channel();
         final HttpEventWrapper requestWrapper = handlerSpecific.getAsyncContext().getRequest();
 
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
+        if (log.isInfoEnabled()) {
+            log.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
                     EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channel),
                     IPUtils.getLocalAddress());
         }
@@ -103,9 +101,7 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                 }
         )).orElseGet(HashMap::new);
 
-        if (requestBodyMap.get("url") == null
-                || requestBodyMap.get("topic") == null
-                || requestBodyMap.get("consumerGroup") == null) {
+        if (validatedRequestBodyMap(requestBodyMap)) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                     responseBodyMap, null);
             return;
@@ -133,8 +129,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                             item.getTopic(),
                             requestWrapper.getRequestURI());
                 } catch (Exception e) {
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+                    if (log.isWarnEnabled()) {
+                        log.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
                     }
 
                     handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
@@ -148,8 +144,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
         try {
             if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
                     eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
-                if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("subscriber url {} is not valid", url);
+                if (log.isErrorEnabled()) {
+                    log.error("subscriber url {} is not valid", url);
                 }
                 
                 handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
@@ -157,8 +153,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                 return;
             }
         } catch (Exception e) {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.error("subscriber url {} is not valid", url, e);
+            if (log.isErrorEnabled()) {
+                log.error("subscriber url {} is not valid", url, e);
             }
 
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
@@ -169,8 +165,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
         // obtain webhook delivery agreement for Abuse Protection
         if (!WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
                 url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin())) {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.error("subscriber url {} is not allowed by the target system", url);
+            if (log.isErrorEnabled()) {
+                log.error("subscriber url {} is not allowed by the target system", url);
             }
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                     responseBodyMap, null);
@@ -186,8 +182,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                         .get(consumerGroup + "@" + subTopic.getTopic());
 
                 if (CollectionUtils.isEmpty(groupTopicClients)) {
-                    if (LOGGER.isErrorEnabled()) {
-                        LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
+                    if (log.isErrorEnabled()) {
+                        log.error("group {} topic {} clients is empty", consumerGroup, subTopic);
                     }
                 }
 
@@ -264,8 +260,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
                 handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
 
             } catch (Exception e) {
-                if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
+                if (log.isErrorEnabled()) {
+                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
                             System.currentTimeMillis() - startTime,
                             JsonUtils.serialize(subscriptionList),
                             url, e);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index 16eba2479..11c9df06d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -108,9 +108,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
                 }
         )).orElseGet(Maps::newHashMap);
 
-        if (requestBodyMap.get(EventMeshConstants.URL) == null
-                || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
-                || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+        if (validatedRequestBodyMap(requestBodyMap)) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                     responseBodyMap, null);
             return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
index 1dc1ce043..503ac87dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
@@ -25,27 +25,20 @@ import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.RequestURI;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.common.EventMeshTrace;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.WebhookUtil;
 
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -59,10 +52,9 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.HttpRequest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-@EventMeshTrace(isEnable = false)
+@EventMeshTrace
 public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
@@ -82,9 +74,9 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
         ChannelHandlerContext ctx = handlerSpecific.getCtx();
 
         HttpEventWrapper requestWrapper = asyncContext.getRequest();
-
+        String localAddress = IPUtils.getLocalAddress();
         httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
-            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
+            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress
         );
 
         // user request header
@@ -116,12 +108,9 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
         Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
             new String(requestBody, Constants.DEFAULT_CHARSET),
             new TypeReference<HashMap<String, Object>>() {}
-        )).orElse(Maps.newHashMap());
-
+        )).orElseGet(Maps::newHashMap);
 
-        if (requestBodyMap.get(EventMeshConstants.URL) == null
-            || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
-            || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+        if (validatedRequestBodyMap(requestBodyMap)) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                 responseBodyMap, null);
             return;
@@ -136,10 +125,11 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
         List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
             topic,
             new TypeReference<List<SubscriptionItem>>() {}
-        )).orElse(Collections.emptyList());
+        )).orElseGet(Collections::emptyList);
 
         //do acl check
-        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
+        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
+        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
             String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
             String user = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString();
             String pass = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString();
@@ -160,8 +150,8 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
 
         // validate URL
         try {
-            if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
+            if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.eventMeshIpv4BlackList,
+                eventMeshHttpConfiguration.eventMeshIpv6BlackList)) {
                 httpLogger.error("subscriber url {} is not valid", url);
                 handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                     responseBodyMap, null);
@@ -176,7 +166,7 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
 
         // obtain webhook delivery agreement for Abuse Protection
         boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
-            url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin());
+            url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin());
 
         if (!isWebhookAllowed) {
             httpLogger.error("subscriber url {} is not allowed by the target system", url);
@@ -187,33 +177,13 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
 
         long startTime = System.currentTimeMillis();
         try {
-            // request to remote
-
-            String env = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv();
-            String idc = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC();
-            String cluster = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster();
-            String sysId = eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID();
-            String meshGroup = env + "-" + idc + "-" + cluster + "-" + sysId;
-
-            Map<String, String> remoteHeaderMap = new HashMap<>();
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, env);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, idc);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, IPUtils.getLocalAddress());
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, sysId);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
-
             // local subscription url
-            String localUrl = "http://" + IPUtils.getLocalAddress() + ":"
-                + eventMeshHTTPServer.getEventMeshHttpConfiguration().httpServerPort
+            String localUrl = "http://" + localAddress + ":"
+                + eventMeshHttpConfiguration.httpServerPort
                 + RequestURI.PUBLISH_BRIDGE.getRequestURI();
-
             Map<String, Object> remoteBodyMap = new HashMap<>();
             remoteBodyMap.put(EventMeshConstants.URL, localUrl);
-            remoteBodyMap.put(EventMeshConstants.CONSUMER_GROUP, meshGroup);
+            remoteBodyMap.put(EventMeshConstants.CONSUMER_GROUP, eventMeshHttpConfiguration.getMeshGroup());
             remoteBodyMap.put(EventMeshConstants.MANAGE_TOPIC, requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC));
 
             String targetMesh = requestBodyMap.get("remoteMesh") == null ? "" : requestBodyMap.get("remoteMesh").toString();
@@ -227,13 +197,13 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
 
             CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.httpClientPool.getClient();
 
-            String remoteResult = post(closeableHttpClient, targetMesh, remoteHeaderMap, remoteBodyMap,
+            String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap,
                 response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET));
 
             Map<String, String> remoteResultMap = Optional.ofNullable(JsonUtils.deserialize(
                 remoteResult,
                 new TypeReference<Map<String, String>>() {}
-            )).orElse(Maps.newHashMap());
+            )).orElseGet(Maps::newHashMap);
 
             if (String.valueOf(EventMeshRetCode.SUCCESS.getRetCode()).equals(remoteResultMap.get(EventMeshConstants.RET_CODE))) {
                 responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
@@ -261,40 +231,4 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
         return new String[] {RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
     }
 
-    public static String post(CloseableHttpClient client, String uri,
-                              Map<String, String> requestHeader, Map<String, Object> requestBody,
-                              ResponseHandler<String> responseHandler) throws IOException {
-        Preconditions.checkState(client != null, "client can't be null");
-        Preconditions.checkState(StringUtils.isNotBlank(uri), "uri can't be null");
-        Preconditions.checkState(requestHeader != null, "requestParam can't be null");
-        Preconditions.checkState(responseHandler != null, "responseHandler can't be null");
-
-        HttpPost httpPost = new HttpPost(uri);
-
-        httpPost.addHeader("Content-Type", ContentType.APPLICATION_JSON.getMimeType());
-
-        //header
-        if (MapUtils.isNotEmpty(requestHeader)) {
-            for (Map.Entry<String, String> entry : requestHeader.entrySet()) {
-                httpPost.addHeader(entry.getKey(), entry.getValue());
-            }
-        }
-
-        //body
-        if (MapUtils.isNotEmpty(requestBody)) {
-            String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
-            httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
-        }
-
-        //ttl
-        RequestConfig.Builder configBuilder = RequestConfig.custom();
-        configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
-            .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
-            .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
-
-        httpPost.setConfig(configBuilder.build());
-
-        return client.execute(httpPost, responseHandler);
-    }
-
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
index af991063d..c6b5b2ad4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
@@ -17,18 +17,14 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor;
 
-import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONTENT_TYPE;
-
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.RequestURI;
-import org.apache.eventmesh.common.utils.AssertUtils;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.common.EventMeshTrace;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
@@ -37,17 +33,10 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -85,8 +74,9 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
 
         HttpEventWrapper requestWrapper = asyncContext.getRequest();
 
+        String localAddress = IPUtils.getLocalAddress();
         httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
-            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
+            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress
         );
 
         // user request header
@@ -103,6 +93,7 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
         Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
 
         Map<String, Object> responseBodyMap = new HashMap<>();
+        
 
         //validate header
         if (validateSysHeader(sysHeaderMap)) {
@@ -117,11 +108,9 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
         Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
             new String(requestBody, Constants.DEFAULT_CHARSET),
             new TypeReference<HashMap<String, Object>>() {}
-        )).orElse(Maps.newHashMap());
+        )).orElseGet(Maps::newHashMap);
 
-        if (requestBodyMap.get(EventMeshConstants.URL) == null
-            || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
-            || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+        if (validatedRequestBodyMap(requestBodyMap)) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                 responseBodyMap, null);
             return;
@@ -141,19 +130,10 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
             String sysId = eventMeshHttpConfiguration.getSysID();
             String meshGroup = String.join("-", env, idc, cluster, sysId);
 
-            Map<String, String> remoteHeaderMap = new HashMap<>();
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, env);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, idc);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, IPUtils.getLocalAddress());
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, sysId);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
-            remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
+  
 
             // local unSubscription url
-            String unsubscribeUrl = "http://" + IPUtils.getLocalAddress() + ":"
+            String unsubscribeUrl = "http://" + localAddress + ":"
                 + eventMeshHttpConfiguration.httpServerPort
                 + RequestURI.PUBLISH_BRIDGE.getRequestURI();
 
@@ -185,7 +165,7 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
 
             CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.httpClientPool.getClient();
 
-            String remoteResult = post(closeableHttpClient, targetMesh, remoteHeaderMap, remoteBodyMap,
+            String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap,
                 response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET));
 
             Map<String, String> remoteResultMap = Optional.ofNullable(JsonUtils.deserialize(
@@ -217,38 +197,6 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
         return new String[] {RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
     }
 
-    public static String post(CloseableHttpClient client, String uri,
-                              Map<String, String> requestHeader, Map<String, Object> requestBody,
-                              ResponseHandler<String> responseHandler) throws IOException {
-        AssertUtils.notNull(client, "client can't be null");
-        AssertUtils.notBlack(uri, "uri can't be null");
-        AssertUtils.notNull(requestHeader, "requestParam can't be null");
-        AssertUtils.notNull(responseHandler, "responseHandler can't be null");
-
-        HttpPost httpPost = new HttpPost(uri);
-
-        httpPost.addHeader(CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
-
-        //header
-        if (MapUtils.isNotEmpty(requestHeader)) {
-            requestHeader.forEach(httpPost::addHeader);
-        }
-
-        //body
-        if (MapUtils.isNotEmpty(requestBody)) {
-            String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
-            httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
-        }
-
-        //ttl
-        RequestConfig.Builder configBuilder = RequestConfig.custom();
-        configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
-            .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
-            .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
-
-        httpPost.setConfig(configBuilder.build());
-
-        return client.execute(httpPost, responseHandler);
-    }
+    
 
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
index 02e022214..4cc65a77b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
@@ -17,17 +17,23 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor.inf;
 
+import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONTENT_TYPE;
+
 import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
+import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.common.utils.AssertUtils;
 import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.utils.ThreadUtils;
 import org.apache.eventmesh.registry.nacos.constant.NacosConstant;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupMetadata;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
@@ -35,8 +41,16 @@ import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicMetadat
 import org.apache.eventmesh.runtime.core.protocol.http.processor.AsyncHttpProcessor;
 import org.apache.eventmesh.runtime.registry.Registry;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -182,4 +196,83 @@ public abstract class AbstractEventProcessor implements AsyncHttpProcessor {
             sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())
             || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
     }
+
+    /**
+     * validation requestBodyMap key url topic consumerGroup is any null
+     * @param requestBodyMap requestBodyMap
+     * @return any null then true
+     */
+    protected boolean validatedRequestBodyMap(Map<String, Object> requestBodyMap) {
+        return requestBodyMap.get(EventMeshConstants.URL) == null
+            || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
+            || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null;
+        
+    }
+
+    /**
+     * builder RemoteHeaderMap
+     * @param localAddress
+     * @return
+     */
+    protected Map<String, String> builderRemoteHeaderMap(String localAddress) {
+        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
+        String meshGroup = eventMeshHttpConfiguration.getMeshGroup();
+
+        Map<String, String> remoteHeaderMap = new HashMap<>();
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpConfiguration.getEventMeshEnv());
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpConfiguration.getEventMeshIDC());
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, localAddress);
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpConfiguration.getSysID());
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
+        remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
+        return remoteHeaderMap;
+    }
+
+    /**
+     *  http post
+     * @param client client
+     * @param uri uri
+     * @param requestHeader requestHeader
+     * @param requestBody requestBody
+     * @param responseHandler responseHandler
+     * @return string
+     * @throws IOException
+     */
+    public static String post(CloseableHttpClient client, String uri,
+                              Map<String, String> requestHeader, Map<String, Object> requestBody,
+                              ResponseHandler<String> responseHandler) throws IOException {
+        AssertUtils.notNull(client, "client can't be null");
+        AssertUtils.notBlack(uri, "uri can't be null");
+        AssertUtils.notNull(requestHeader, "requestParam can't be null");
+        AssertUtils.notNull(responseHandler, "responseHandler can't be null");
+
+        HttpPost httpPost = new HttpPost(uri);
+
+        httpPost.addHeader(CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
+
+        //header
+        if (MapUtils.isNotEmpty(requestHeader)) {
+            requestHeader.forEach(httpPost::addHeader);
+        }
+
+        //body
+        if (MapUtils.isNotEmpty(requestBody)) {
+            String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
+            httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
+        }
+
+        //ttl
+        RequestConfig.Builder configBuilder = RequestConfig.custom();
+        configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
+            .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
+            .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
+
+        httpPost.setConfig(configBuilder.build());
+
+        return client.execute(httpPost, responseHandler);
+    }
+    
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org