You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/05 20:17:24 UTC
[incubator-eventmesh] branch master updated: [ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new e808aa269 [ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)
e808aa269 is described below
commit e808aa269b4a9ebe65a47531ae8fd3368aba1c1d
Author: weihubeats <we...@163.com>
AuthorDate: Fri Jan 6 04:17:18 2023 +0800
[ISSUE #2668] Method appears to call the same method on the same object redundantly [RemoteSubscribeEventProcessor] (#2816)
* Simplify the code
* Simplify the code
---
.../common/config/CommonConfiguration.java | 4 +
.../processor/LocalSubscribeEventProcessor.java | 44 +++++----
.../processor/LocalUnSubscribeEventProcessor.java | 4 +-
.../processor/RemoteSubscribeEventProcessor.java | 100 ++++-----------------
.../processor/RemoteUnSubscribeEventProcessor.java | 70 ++-------------
.../http/processor/inf/AbstractEventProcessor.java | 93 +++++++++++++++++++
6 files changed, 144 insertions(+), 171 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 62d2330c3..e0ed6960b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -67,6 +67,10 @@ public class CommonConfiguration {
public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
this.configurationWrapper = configurationWrapper;
}
+
+ public String getMeshGroup() {
+ return String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID);
+ }
public void init() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index 5b3fe361b..afc2cad28 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -48,20 +48,18 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.extern.slf4j.Slf4j;
-@EventMeshTrace(isEnable = false)
-public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LocalSubscribeEventProcessor.class);
+@EventMeshTrace
+@Slf4j
+public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
+
public LocalSubscribeEventProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
super(eventMeshHTTPServer);
}
@@ -73,8 +71,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
final Channel channel = handlerSpecific.getCtx().channel();
final HttpEventWrapper requestWrapper = handlerSpecific.getAsyncContext().getRequest();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
+ if (log.isInfoEnabled()) {
+ log.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channel),
IPUtils.getLocalAddress());
}
@@ -103,9 +101,7 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
}
)).orElseGet(HashMap::new);
- if (requestBodyMap.get("url") == null
- || requestBodyMap.get("topic") == null
- || requestBodyMap.get("consumerGroup") == null) {
+ if (validatedRequestBodyMap(requestBodyMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
return;
@@ -133,8 +129,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
item.getTopic(),
requestWrapper.getRequestURI());
} catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+ if (log.isWarnEnabled()) {
+ log.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
}
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
@@ -148,8 +144,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
try {
if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("subscriber url {} is not valid", url);
+ if (log.isErrorEnabled()) {
+ log.error("subscriber url {} is not valid", url);
}
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
@@ -157,8 +153,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
return;
}
} catch (Exception e) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("subscriber url {} is not valid", url, e);
+ if (log.isErrorEnabled()) {
+ log.error("subscriber url {} is not valid", url, e);
}
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
@@ -169,8 +165,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
// obtain webhook delivery agreement for Abuse Protection
if (!WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin())) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("subscriber url {} is not allowed by the target system", url);
+ if (log.isErrorEnabled()) {
+ log.error("subscriber url {} is not allowed by the target system", url);
}
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
@@ -186,8 +182,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
.get(consumerGroup + "@" + subTopic.getTopic());
if (CollectionUtils.isEmpty(groupTopicClients)) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
+ if (log.isErrorEnabled()) {
+ log.error("group {} topic {} clients is empty", consumerGroup, subTopic);
}
}
@@ -264,8 +260,8 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
} catch (Exception e) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
+ if (log.isErrorEnabled()) {
+ log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
System.currentTimeMillis() - startTime,
JsonUtils.serialize(subscriptionList),
url, e);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index 16eba2479..11c9df06d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -108,9 +108,7 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
}
)).orElseGet(Maps::newHashMap);
- if (requestBodyMap.get(EventMeshConstants.URL) == null
- || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
- || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+ if (validatedRequestBodyMap(requestBodyMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
index 1dc1ce043..503ac87dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java
@@ -25,27 +25,20 @@ import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.WebhookUtil;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -59,10 +52,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-@EventMeshTrace(isEnable = false)
+@EventMeshTrace
public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
@@ -82,9 +74,9 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
ChannelHandlerContext ctx = handlerSpecific.getCtx();
HttpEventWrapper requestWrapper = asyncContext.getRequest();
-
+ String localAddress = IPUtils.getLocalAddress();
httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
- EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
+ EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress
);
// user request header
@@ -116,12 +108,9 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
new String(requestBody, Constants.DEFAULT_CHARSET),
new TypeReference<HashMap<String, Object>>() {}
- )).orElse(Maps.newHashMap());
-
+ )).orElseGet(Maps::newHashMap);
- if (requestBodyMap.get(EventMeshConstants.URL) == null
- || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
- || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+ if (validatedRequestBodyMap(requestBodyMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
return;
@@ -136,10 +125,11 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
topic,
new TypeReference<List<SubscriptionItem>>() {}
- )).orElse(Collections.emptyList());
+ )).orElseGet(Collections::emptyList);
//do acl check
- if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
+ EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
+ if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString();
String pass = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString();
@@ -160,8 +150,8 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
// validate URL
try {
- if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
+ if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.eventMeshIpv4BlackList,
+ eventMeshHttpConfiguration.eventMeshIpv6BlackList)) {
httpLogger.error("subscriber url {} is not valid", url);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
@@ -176,7 +166,7 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
// obtain webhook delivery agreement for Abuse Protection
boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
- url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin());
+ url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin());
if (!isWebhookAllowed) {
httpLogger.error("subscriber url {} is not allowed by the target system", url);
@@ -187,33 +177,13 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
long startTime = System.currentTimeMillis();
try {
- // request to remote
-
- String env = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv();
- String idc = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC();
- String cluster = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster();
- String sysId = eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID();
- String meshGroup = env + "-" + idc + "-" + cluster + "-" + sysId;
-
- Map<String, String> remoteHeaderMap = new HashMap<>();
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, env);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, IPUtils.getLocalAddress());
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, sysId);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
-
// local subscription url
- String localUrl = "http://" + IPUtils.getLocalAddress() + ":"
- + eventMeshHTTPServer.getEventMeshHttpConfiguration().httpServerPort
+ String localUrl = "http://" + localAddress + ":"
+ + eventMeshHttpConfiguration.httpServerPort
+ RequestURI.PUBLISH_BRIDGE.getRequestURI();
-
Map<String, Object> remoteBodyMap = new HashMap<>();
remoteBodyMap.put(EventMeshConstants.URL, localUrl);
- remoteBodyMap.put(EventMeshConstants.CONSUMER_GROUP, meshGroup);
+ remoteBodyMap.put(EventMeshConstants.CONSUMER_GROUP, eventMeshHttpConfiguration.getMeshGroup());
remoteBodyMap.put(EventMeshConstants.MANAGE_TOPIC, requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC));
String targetMesh = requestBodyMap.get("remoteMesh") == null ? "" : requestBodyMap.get("remoteMesh").toString();
@@ -227,13 +197,13 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.httpClientPool.getClient();
- String remoteResult = post(closeableHttpClient, targetMesh, remoteHeaderMap, remoteBodyMap,
+ String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap,
response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET));
Map<String, String> remoteResultMap = Optional.ofNullable(JsonUtils.deserialize(
remoteResult,
new TypeReference<Map<String, String>>() {}
- )).orElse(Maps.newHashMap());
+ )).orElseGet(Maps::newHashMap);
if (String.valueOf(EventMeshRetCode.SUCCESS.getRetCode()).equals(remoteResultMap.get(EventMeshConstants.RET_CODE))) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
@@ -261,40 +231,4 @@ public class RemoteSubscribeEventProcessor extends AbstractEventProcessor {
return new String[] {RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
}
- public static String post(CloseableHttpClient client, String uri,
- Map<String, String> requestHeader, Map<String, Object> requestBody,
- ResponseHandler<String> responseHandler) throws IOException {
- Preconditions.checkState(client != null, "client can't be null");
- Preconditions.checkState(StringUtils.isNotBlank(uri), "uri can't be null");
- Preconditions.checkState(requestHeader != null, "requestParam can't be null");
- Preconditions.checkState(responseHandler != null, "responseHandler can't be null");
-
- HttpPost httpPost = new HttpPost(uri);
-
- httpPost.addHeader("Content-Type", ContentType.APPLICATION_JSON.getMimeType());
-
- //header
- if (MapUtils.isNotEmpty(requestHeader)) {
- for (Map.Entry<String, String> entry : requestHeader.entrySet()) {
- httpPost.addHeader(entry.getKey(), entry.getValue());
- }
- }
-
- //body
- if (MapUtils.isNotEmpty(requestBody)) {
- String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
- httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
- }
-
- //ttl
- RequestConfig.Builder configBuilder = RequestConfig.custom();
- configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
- .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
- .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
-
- httpPost.setConfig(configBuilder.build());
-
- return client.execute(httpPost, responseHandler);
- }
-
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
index af991063d..c6b5b2ad4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java
@@ -17,18 +17,14 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONTENT_TYPE;
-
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
-import org.apache.eventmesh.common.utils.AssertUtils;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
@@ -37,17 +33,10 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
import org.apache.eventmesh.runtime.util.RemotingHelper;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -85,8 +74,9 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
HttpEventWrapper requestWrapper = asyncContext.getRequest();
+ String localAddress = IPUtils.getLocalAddress();
httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
- EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
+ EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress
);
// user request header
@@ -103,6 +93,7 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
Map<String, Object> responseBodyMap = new HashMap<>();
+
//validate header
if (validateSysHeader(sysHeaderMap)) {
@@ -117,11 +108,9 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
new String(requestBody, Constants.DEFAULT_CHARSET),
new TypeReference<HashMap<String, Object>>() {}
- )).orElse(Maps.newHashMap());
+ )).orElseGet(Maps::newHashMap);
- if (requestBodyMap.get(EventMeshConstants.URL) == null
- || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
- || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null) {
+ if (validatedRequestBodyMap(requestBodyMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
return;
@@ -141,19 +130,10 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
String sysId = eventMeshHttpConfiguration.getSysID();
String meshGroup = String.join("-", env, idc, cluster, sysId);
- Map<String, String> remoteHeaderMap = new HashMap<>();
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, env);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, IPUtils.getLocalAddress());
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, sysId);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
- remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
+
// local unSubscription url
- String unsubscribeUrl = "http://" + IPUtils.getLocalAddress() + ":"
+ String unsubscribeUrl = "http://" + localAddress + ":"
+ eventMeshHttpConfiguration.httpServerPort
+ RequestURI.PUBLISH_BRIDGE.getRequestURI();
@@ -185,7 +165,7 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.httpClientPool.getClient();
- String remoteResult = post(closeableHttpClient, targetMesh, remoteHeaderMap, remoteBodyMap,
+ String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap,
response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET));
Map<String, String> remoteResultMap = Optional.ofNullable(JsonUtils.deserialize(
@@ -217,38 +197,6 @@ public class RemoteUnSubscribeEventProcessor extends AbstractEventProcessor {
return new String[] {RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
}
- public static String post(CloseableHttpClient client, String uri,
- Map<String, String> requestHeader, Map<String, Object> requestBody,
- ResponseHandler<String> responseHandler) throws IOException {
- AssertUtils.notNull(client, "client can't be null");
- AssertUtils.notBlack(uri, "uri can't be null");
- AssertUtils.notNull(requestHeader, "requestParam can't be null");
- AssertUtils.notNull(responseHandler, "responseHandler can't be null");
-
- HttpPost httpPost = new HttpPost(uri);
-
- httpPost.addHeader(CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
-
- //header
- if (MapUtils.isNotEmpty(requestHeader)) {
- requestHeader.forEach(httpPost::addHeader);
- }
-
- //body
- if (MapUtils.isNotEmpty(requestBody)) {
- String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
- httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
- }
-
- //ttl
- RequestConfig.Builder configBuilder = RequestConfig.custom();
- configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
- .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
- .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
-
- httpPost.setConfig(configBuilder.build());
-
- return client.execute(httpPost, responseHandler);
- }
+
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
index 02e022214..4cc65a77b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java
@@ -17,17 +17,23 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor.inf;
+import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONTENT_TYPE;
+
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
+import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.common.utils.AssertUtils;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.registry.nacos.constant.NacosConstant;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupMetadata;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
@@ -35,8 +41,16 @@ import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicMetadat
import org.apache.eventmesh.runtime.core.protocol.http.processor.AsyncHttpProcessor;
import org.apache.eventmesh.runtime.registry.Registry;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -182,4 +196,83 @@ public abstract class AbstractEventProcessor implements AsyncHttpProcessor {
sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
}
+
+ /**
+ * validation requestBodyMap key url topic consumerGroup is any null
+ * @param requestBodyMap requestBodyMap
+ * @return any null then true
+ */
+ protected boolean validatedRequestBodyMap(Map<String, Object> requestBodyMap) {
+ return requestBodyMap.get(EventMeshConstants.URL) == null
+ || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null
+ || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null;
+
+ }
+
+ /**
+ * builder RemoteHeaderMap
+ * @param localAddress
+ * @return
+ */
+ protected Map<String, String> builderRemoteHeaderMap(String localAddress) {
+ EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
+ String meshGroup = eventMeshHttpConfiguration.getMeshGroup();
+
+ Map<String, String> remoteHeaderMap = new HashMap<>();
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpConfiguration.getEventMeshEnv());
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpConfiguration.getEventMeshIDC());
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, localAddress);
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PID, String.valueOf(ThreadUtils.getPID()));
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpConfiguration.getSysID());
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.USERNAME, EventMeshConstants.USER_NAME);
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PASSWD, EventMeshConstants.PASSWD);
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP, meshGroup);
+ remoteHeaderMap.put(ProtocolKey.ClientInstanceKey.CONSUMERGROUP, meshGroup);
+ return remoteHeaderMap;
+ }
+
+ /**
+ * http post
+ * @param client client
+ * @param uri uri
+ * @param requestHeader requestHeader
+ * @param requestBody requestBody
+ * @param responseHandler responseHandler
+ * @return string
+ * @throws IOException
+ */
+ public static String post(CloseableHttpClient client, String uri,
+ Map<String, String> requestHeader, Map<String, Object> requestBody,
+ ResponseHandler<String> responseHandler) throws IOException {
+ AssertUtils.notNull(client, "client can't be null");
+ AssertUtils.notBlack(uri, "uri can't be null");
+ AssertUtils.notNull(requestHeader, "requestParam can't be null");
+ AssertUtils.notNull(responseHandler, "responseHandler can't be null");
+
+ HttpPost httpPost = new HttpPost(uri);
+
+ httpPost.addHeader(CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
+
+ //header
+ if (MapUtils.isNotEmpty(requestHeader)) {
+ requestHeader.forEach(httpPost::addHeader);
+ }
+
+ //body
+ if (MapUtils.isNotEmpty(requestBody)) {
+ String jsonStr = Optional.ofNullable(JsonUtils.serialize(requestBody)).orElse("");
+ httpPost.setEntity(new StringEntity(jsonStr, ContentType.APPLICATION_JSON));
+ }
+
+ //ttl
+ RequestConfig.Builder configBuilder = RequestConfig.custom();
+ configBuilder.setSocketTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
+ .setConnectTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)))
+ .setConnectionRequestTimeout(Integer.parseInt(String.valueOf(Constants.DEFAULT_HTTP_TIME_OUT)));
+
+ httpPost.setConfig(configBuilder.build());
+
+ return client.execute(httpPost, responseHandler);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org