You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/02 05:34:48 UTC
[incubator-eventmesh] branch master updated: fix issue2654
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a140e18b3 fix issue2654
new 5d2de1609 Merge pull request #2772 from jonyangx/issue2654
a140e18b3 is described below
commit a140e18b3d5d8f2c7381af726d35a1d7f20c81cc
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Jan 1 09:27:52 2023 +0800
fix issue2654
---
.../http/consumer/HttpClientGroupMapping.java | 48 ++---
.../http/processor/HeartBeatProcessor.java | 34 ++--
.../processor/LocalSubscribeEventProcessor.java | 224 ++++++++++-----------
.../processor/LocalUnSubscribeEventProcessor.java | 44 ++--
.../http/processor/SubscribeProcessor.java | 34 ++--
.../http/processor/UnSubscribeProcessor.java | 42 ++--
.../core/protocol/http/processor/inf/Client.java | 111 +++++++++-
7 files changed, 310 insertions(+), 227 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 301cd1e6c..582c61d39 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -322,23 +322,23 @@ public class HttpClientGroupMapping {
final List<SubscriptionItem> subscriptionItems, String url) {
for (SubscriptionItem item : subscriptionItems) {
Client client = new Client();
- client.env = subscribeRequestHeader.getEnv();
- client.idc = subscribeRequestHeader.getIdc();
- client.sys = subscribeRequestHeader.getSys();
- client.ip = subscribeRequestHeader.getIp();
- client.pid = subscribeRequestHeader.getPid();
- client.consumerGroup = consumerGroup;
- client.topic = item.getTopic();
- client.url = url;
- client.lastUpTime = new Date();
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ client.setEnv(subscribeRequestHeader.getEnv());
+ client.setIdc(subscribeRequestHeader.getIdc());
+ client.setSys(subscribeRequestHeader.getSys());
+ client.setIp(subscribeRequestHeader.getIp());
+ client.setPid(subscribeRequestHeader.getPid());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(item.getTopic());
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients = localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
@@ -381,25 +381,25 @@ public class HttpClientGroupMapping {
final List<String> topicList, String url) {
for (String topic : topicList) {
Client client = new Client();
- client.env = unSubscribeRequestHeader.getEnv();
- client.idc = unSubscribeRequestHeader.getIdc();
- client.sys = unSubscribeRequestHeader.getSys();
- client.ip = unSubscribeRequestHeader.getIp();
- client.pid = unSubscribeRequestHeader.getPid();
- client.consumerGroup = consumerGroup;
- client.topic = topic;
- client.url = url;
- client.lastUpTime = new Date();
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ client.setEnv(unSubscribeRequestHeader.getEnv());
+ client.setIdc(unSubscribeRequestHeader.getIdc());
+ client.setSys(unSubscribeRequestHeader.getSys());
+ client.setIp(unSubscribeRequestHeader.getIp());
+ client.setPid(unSubscribeRequestHeader.getPid());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(topic);
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients =
localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 083f2167c..f439bcada 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -115,18 +115,18 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
for (HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : heartbeatEntities) {
Client client = new Client();
- client.env = env;
- client.idc = idc;
- client.sys = sys;
- client.ip = ip;
- client.pid = pid;
- client.consumerGroup = consumerGroup;
- client.topic = heartbeatEntity.topic;
- client.url = heartbeatEntity.url;
-
- client.lastUpTime = new Date();
-
- if (StringUtils.isBlank(client.topic)) {
+ client.setEnv(env);
+ client.setIdc(idc);
+ client.setSys(sys);
+ client.setIp(ip);
+ client.setPid(pid);
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(heartbeatEntity.topic);
+ client.setUrl(heartbeatEntity.url);
+
+ client.setLastUpTime(new Date());
+
+ if (StringUtils.isBlank(client.getTopic())) {
continue;
}
@@ -137,7 +137,7 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
String pass = heartbeatRequestHeader.getPasswd();
int requestCode = Integer.parseInt(heartbeatRequestHeader.getCode());
try {
- Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode);
+ Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.getTopic(), requestCode);
} catch (Exception e) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
heartbeatResponseHeader,
@@ -149,11 +149,11 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
}
}
- if (StringUtils.isBlank(client.url)) {
+ if (StringUtils.isBlank(client.getUrl())) {
continue;
}
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (tmp.containsKey(groupTopicKey)) {
tmp.get(groupTopicKey).add(client);
@@ -219,9 +219,9 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
for (Client tmpClient : tmpClientList) {
boolean isContains = false;
for (Client localClient : localClientList) {
- if (StringUtils.equals(localClient.url, tmpClient.url)) {
+ if (StringUtils.equals(localClient.getUrl(), tmpClient.getUrl())) {
isContains = true;
- localClient.lastUpTime = tmpClient.lastUpTime;
+ localClient.setLastUpTime(tmpClient.getLastUpTime());
break;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index 2b2ad88d5..9c93fefb7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -31,7 +31,6 @@ import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
-import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.util.RemotingHelper;
@@ -48,12 +47,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -62,86 +60,85 @@ import com.fasterxml.jackson.core.type.TypeReference;
@EventMeshTrace(isEnable = false)
public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
- public Logger httpLogger = LoggerFactory.getLogger("http");
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalSubscribeEventProcessor.class);
- public Logger aclLogger = LoggerFactory.getLogger("acl");
-
- public LocalSubscribeEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
+ public LocalSubscribeEventProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
super(eventMeshHTTPServer);
}
@Override
- public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest httpRequest) throws Exception {
-
- AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
+ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest)
+ throws Exception {
- ChannelHandlerContext ctx = handlerSpecific.getCtx();
+ final Channel channel = handlerSpecific.getCtx().channel();
+ final HttpEventWrapper requestWrapper = handlerSpecific.getAsyncContext().getRequest();
- HttpEventWrapper requestWrapper = asyncContext.getRequest();
-
- httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
- EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()
- );
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("uri={}|{}|client2eventMesh|from={}|to={}", requestWrapper.getRequestURI(),
+ EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channel),
+ IPUtils.getLocalAddress());
+ }
// user request header
- Map<String, Object> userRequestHeaderMap = requestWrapper.getHeaderMap();
- String requestIp = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- userRequestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP, requestIp);
-
+ requestWrapper.getHeaderMap().put(ProtocolKey.ClientInstanceKey.IP,
+ RemotingHelper.parseChannelRemoteAddr(channel));
// build sys header
requestWrapper.buildSysHeaderForClient();
- Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
-
- Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
-
- Map<String, Object> responseBodyMap = new HashMap<>();
+ final Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
+ final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
+ final Map<String, Object> responseBodyMap = new HashMap<>();
//validate header
if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
//validate body
- byte[] requestBody = requestWrapper.getBody();
-
- Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
- new String(requestBody, Constants.DEFAULT_CHARSET),
- new TypeReference<HashMap<String, Object>>() {}
- )).orElse(new HashMap<>());
+ final Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.deserialize(
+ new String(requestWrapper.getBody(), Constants.DEFAULT_CHARSET),
+ new TypeReference<HashMap<String, Object>>() {
+ }
+ )).orElseGet(HashMap::new);
- if (requestBodyMap.get("url") == null || requestBodyMap.get("topic") == null || requestBodyMap.get("consumerGroup") == null) {
+ if (requestBodyMap.get("url") == null
+ || requestBodyMap.get("topic") == null
+ || requestBodyMap.get("consumerGroup") == null) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
- String url = requestBodyMap.get("url").toString();
- String consumerGroup = requestBodyMap.get("consumerGroup").toString();
- String topic = JsonUtils.serialize(requestBodyMap.get("topic"));
+ final String url = requestBodyMap.get("url").toString();
+ final String consumerGroup = requestBodyMap.get("consumerGroup").toString();
+ final String topic = JsonUtils.serialize(requestBodyMap.get("topic"));
// SubscriptionItem
- List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
- topic,
- new TypeReference<List<SubscriptionItem>>() {}
- )).orElse(Collections.emptyList());
+ final List<SubscriptionItem> subscriptionList = Optional.ofNullable(JsonUtils.deserialize(
+ topic,
+ new TypeReference<List<SubscriptionItem>>() {
+ }
+ )).orElseGet(Collections::emptyList);
//do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
- String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- String user = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString();
- String pass = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString();
- String subsystem = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
- for (SubscriptionItem item : subscriptionList) {
+ for (final SubscriptionItem item : subscriptionList) {
try {
- Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(),
- requestWrapper.getRequestURI());
+ Acl.doAclCheckInHttpReceive(RemotingHelper.parseChannelRemoteAddr(channel),
+ sysHeaderMap.get(ProtocolKey.ClientInstanceKey.USERNAME).toString(),
+ sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PASSWD).toString(),
+ sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString(),
+ item.getTopic(),
+ requestWrapper.getRequestURI());
} catch (Exception e) {
- aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+ }
+
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
}
@@ -150,27 +147,25 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
// validate URL
try {
if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
- httpLogger.error("subscriber url {} is not valid", url);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
+ LOGGER.error("subscriber url {} is not valid", url);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
} catch (Exception e) {
- httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage());
+ LOGGER.error("subscriber url is invalid, url:" + url, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
// obtain webhook delivery agreement for Abuse Protection
- boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
- url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin());
-
- if (!isWebhookAllowed) {
- httpLogger.error("subscriber url {} is not allowed by the target system", url);
+ if (!WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
+ url, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin())) {
+ LOGGER.error("subscriber url {} is not allowed by the target system", url);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
- responseBodyMap, null);
+ responseBodyMap, null);
return;
}
@@ -178,47 +173,47 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
registerClient(requestWrapper, consumerGroup, subscriptionList, url);
- for (SubscriptionItem subTopic : subscriptionList) {
- List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
- .get(consumerGroup + "@" + subTopic.getTopic());
+ for (final SubscriptionItem subTopic : subscriptionList) {
+ final List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+ .get(consumerGroup + "@" + subTopic.getTopic());
if (CollectionUtils.isEmpty(groupTopicClients)) {
- httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
+ LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic);
}
- Map<String, List<String>> idcUrls = new HashMap<>();
- for (Client client : groupTopicClients) {
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+ final Map<String, List<String>> idcUrls = new HashMap<>();
+ for (final Client client : groupTopicClients) {
+ if (idcUrls.containsKey(client.getIdc())) {
+ idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
} else {
- List<String> urls = new ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
+ final List<String> urls = new ArrayList<>();
+ urls.add(client.getUrl());
+ idcUrls.put(client.getIdc(), urls);
}
}
+
ConsumerGroupConf consumerGroupConf =
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
if (consumerGroupConf == null) {
// new subscription
consumerGroupConf = new ConsumerGroupConf(consumerGroup);
- ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
+ final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
consumeTopicConfig.setConsumerGroup(consumerGroup);
consumeTopicConfig.setTopic(subTopic.getTopic());
consumeTopicConfig.setSubscriptionItem(subTopic);
consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url)));
-
consumeTopicConfig.setIdcUrls(idcUrls);
- Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
+ final Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
map.put(subTopic.getTopic(), consumeTopicConfig);
consumerGroupConf.setConsumerGroupTopicConf(map);
} else {
// already subscribed
- Map<String, ConsumerGroupTopicConf> map =
- consumerGroupConf.getConsumerGroupTopicConf();
+ final Map<String, ConsumerGroupTopicConf> map =
+ consumerGroupConf.getConsumerGroupTopicConf();
if (!map.containsKey(subTopic.getTopic())) {
//If there are multiple topics, append it
- ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
+ final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
newTopicConf.setConsumerGroup(consumerGroup);
newTopicConf.setTopic(subTopic.getTopic());
newTopicConf.setSubscriptionItem(subTopic);
@@ -226,19 +221,19 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
newTopicConf.setIdcUrls(idcUrls);
map.put(subTopic.getTopic(), newTopicConf);
}
- Set<Map.Entry<String, ConsumerGroupTopicConf>> entrySet = map.entrySet();
- for (Map.Entry<String, ConsumerGroupTopicConf> set : entrySet) {
+
+ for (final Map.Entry<String, ConsumerGroupTopicConf> set : map.entrySet()) {
if (!StringUtils.equals(subTopic.getTopic(), set.getKey())) {
continue;
}
- ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+ final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(subTopic.getTopic());
latestTopicConf.setSubscriptionItem(subTopic);
latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url)));
- ConsumerGroupTopicConf currentTopicConf = set.getValue();
+ final ConsumerGroupTopicConf currentTopicConf = set.getValue();
latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
latestTopicConf.setIdcUrls(idcUrls);
@@ -248,24 +243,22 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
}
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
- eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
-
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
responseBodyMap.put("retCode", EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
} catch (Exception e) {
- long endTime = System.currentTimeMillis();
- httpLogger.error(
- "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}", endTime - startTime,
- JsonUtils.serialize(subscriptionList), url, e);
- handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null);
+ LOGGER.error(String.format("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%s ms|topic=%s"
+ + "|uniqueId=%s", System.currentTimeMillis() - startTime,
+ JsonUtils.serialize(subscriptionList), url), e);
+ handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap,
+ responseBodyMap, null);
}
// Update service metadata
@@ -276,42 +269,43 @@ public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
+ return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
}
- private void registerClient(HttpEventWrapper requestWrapper, String consumerGroup,
- List<SubscriptionItem> subscriptionItems, String url) {
- Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
- for (SubscriptionItem item : subscriptionItems) {
- Client client = new Client();
- client.env = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString();
- client.idc = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString();
- client.sys = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
- client.ip = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString();
- client.pid = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
- client.consumerGroup = consumerGroup;
- client.topic = item.getTopic();
- client.url = url;
- client.lastUpTime = new Date();
-
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup,
+ final List<SubscriptionItem> subscriptionItems, final String url) {
+ final Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
+ for (final SubscriptionItem item : subscriptionItems) {
+ final Client client = new Client();
+ client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+ client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+ client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+ client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+ client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(item.getTopic());
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+
+ final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
- List<Client> localClients =
- eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+ final List<Client> localClients =
+ eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
- for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ for (final Client localClient : localClients) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
+
if (!isContains) {
localClients.add(client);
}
} else {
- List<Client> clients = new ArrayList<>();
+ final List<Client> clients = new ArrayList<>();
clients.add(client);
eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index 4bdc7f70c..8e901b89e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -142,8 +142,8 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
- if (StringUtils.equals(client.pid, pid)
- && StringUtils.equals(client.url, unSubscribeUrl)) {
+ if (StringUtils.equals(client.getPid(), pid)
+ && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
clientIterator.remove();
}
@@ -154,15 +154,15 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
Set<String> clientUrls = new HashSet<>();
for (Client client : groupTopicClients) {
// remove subscribed url
- if (!StringUtils.equals(unSubscribeUrl, client.url)) {
- clientUrls.add(client.url);
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc)
- .add(StringUtils.deleteWhitespace(client.url));
+ if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
+ clientUrls.add(client.getUrl());
+ if (idcUrls.containsKey(client.getIdc())) {
+ idcUrls.get(client.getIdc())
+ .add(StringUtils.deleteWhitespace(client.getUrl()));
} else {
List<String> urls = new ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
+ urls.add(client.getUrl());
+ idcUrls.put(client.getIdc(), urls);
}
}
@@ -260,25 +260,25 @@ public class LocalUnSubscribeEventProcessor extends AbstractEventProcessor {
Map<String, Object> requestHeaderMap = requestWrapper.getSysHeaderMap();
for (String topic : topicList) {
Client client = new Client();
- client.env = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString();
- client.idc = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString();
- client.sys = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString();
- client.ip = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString();
- client.pid = requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString();
- client.consumerGroup = consumerGroup;
- client.topic = topic;
- client.url = url;
- client.lastUpTime = new Date();
-
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString());
+ client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString());
+ client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString());
+ client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString());
+ client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(topic);
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients =
eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 6d6f54064..ef0d5ad37 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -202,12 +202,12 @@ public class SubscribeProcessor implements HttpRequestProcessor {
Map<String, List<String>> idcUrls = new HashMap<>();
for (Client client : groupTopicClients) {
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+ if (idcUrls.containsKey(client.getIdc())) {
+ idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl()));
} else {
List<String> urls = new ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
+ urls.add(client.getUrl());
+ idcUrls.put(client.getIdc(), urls);
}
}
ConsumerGroupConf consumerGroupConf =
@@ -312,26 +312,26 @@ public class SubscribeProcessor implements HttpRequestProcessor {
List<SubscriptionItem> subscriptionItems, String url) {
for (SubscriptionItem item : subscriptionItems) {
Client client = new Client();
- client.env = subscribeRequestHeader.getEnv();
- client.idc = subscribeRequestHeader.getIdc();
- client.sys = subscribeRequestHeader.getSys();
- client.ip = subscribeRequestHeader.getIp();
- client.pid = subscribeRequestHeader.getPid();
- client.consumerGroup = consumerGroup;
- client.topic = item.getTopic();
- client.url = url;
- client.lastUpTime = new Date();
-
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ client.setEnv(subscribeRequestHeader.getEnv());
+ client.setIdc(subscribeRequestHeader.getIdc());
+ client.setSys(subscribeRequestHeader.getSys());
+ client.setIp(subscribeRequestHeader.getIp());
+ client.setPid(subscribeRequestHeader.getPid());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(item.getTopic());
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
+
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients =
eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index d428df902..419f28d22 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -148,8 +148,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
- if (StringUtils.equals(client.pid, pid)
- && StringUtils.equals(client.url, unSubscribeUrl)) {
+ if (StringUtils.equals(client.getPid(), pid)
+ && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
clientIterator.remove();
}
@@ -160,15 +160,15 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
Set<String> clientUrls = new HashSet<>();
for (Client client : groupTopicClients) {
// remove subscribed url
- if (!StringUtils.equals(unSubscribeUrl, client.url)) {
- clientUrls.add(client.url);
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc)
- .add(StringUtils.deleteWhitespace(client.url));
+ if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
+ clientUrls.add(client.getUrl());
+ if (idcUrls.containsKey(client.getIdc())) {
+ idcUrls.get(client.getIdc())
+ .add(StringUtils.deleteWhitespace(client.getUrl()));
} else {
List<String> urls = new ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
+ urls.add(client.getUrl());
+ idcUrls.put(client.getIdc(), urls);
}
}
@@ -275,25 +275,25 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
List<String> topicList, String url) {
for (String topic : topicList) {
Client client = new Client();
- client.env = unSubscribeRequestHeader.getEnv();
- client.idc = unSubscribeRequestHeader.getIdc();
- client.sys = unSubscribeRequestHeader.getSys();
- client.ip = unSubscribeRequestHeader.getIp();
- client.pid = unSubscribeRequestHeader.getPid();
- client.consumerGroup = consumerGroup;
- client.topic = topic;
- client.url = url;
- client.lastUpTime = new Date();
+ client.setEnv(unSubscribeRequestHeader.getEnv());
+ client.setIdc(unSubscribeRequestHeader.getIdc());
+ client.setSys(unSubscribeRequestHeader.getSys());
+ client.setIp(unSubscribeRequestHeader.getIp());
+ client.setPid(unSubscribeRequestHeader.getPid());
+ client.setConsumerGroup(consumerGroup);
+ client.setTopic(topic);
+ client.setUrl(url);
+ client.setLastUpTime(new Date());
- String groupTopicKey = client.consumerGroup + "@" + client.topic;
+ String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients =
eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
- if (StringUtils.equals(localClient.url, client.url)) {
+ if (StringUtils.equals(localClient.getUrl(), client.getUrl())) {
isContains = true;
- localClient.lastUpTime = client.lastUpTime;
+ localClient.setLastUpTime(client.getLastUpTime());
break;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
index 119c1dea2..2abf68817 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
@@ -23,27 +23,116 @@ import java.util.Date;
public class Client {
- public String env;
+ private String env;
- public String idc;
+ private String idc;
- public String consumerGroup;
+ private String consumerGroup;
- public String topic;
+ private String topic;
- public String url;
+ private String url;
- public String sys;
+ private String sys;
- public String ip;
+ private String ip;
- public String pid;
+ private String pid;
- public String hostname;
+ private String hostname;
- public String apiVersion;
+ private Date lastUpTime;
+
+ public void setEnv(String env) {
+ this.env = env;
+ }
+
+ public void setIdc(String idc) {
+ this.idc = idc;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setSys(String sys) {
+ this.sys = sys;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+
+ public void setLastUpTime(Date lastUpTime) {
+ this.lastUpTime = lastUpTime;
+ }
+
+ private String apiVersion;
+
+ public String getEnv() {
+ return env;
+ }
+
+ public String getIdc() {
+ return idc;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getSys() {
+ return sys;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ public Date getLastUpTime() {
+ return lastUpTime;
+ }
- public Date lastUpTime;
@Override
public String toString() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org