You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/22 02:24:45 UTC
[incubator-eventmesh] branch master updated: [Issue #774] Optimize the object property description of eventmesh client (#775)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 12a223c [Issue #774] Optimize the object property description of eventmesh client (#775)
12a223c is described below
commit 12a223c4f28f9c46f559c1e88eb87325bb8a7639
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Tue Feb 22 10:24:38 2022 +0800
[Issue #774] Optimize the object property description of eventmesh client (#775)
* modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field
* modify: fix checksyle error
* modify: fix checksyle error in ClientGroupWrapper.java
close #774
---
.../eventmesh/common/protocol/tcp/UserAgent.java | 19 +-
.../eventmesh/tcp/common/EventMeshTestUtils.java | 6 +-
.../handler/ShowListenClientByTopicHandler.java | 2 +-
.../tcp/client/group/ClientGroupWrapper.java | 366 ++++++++++-----------
.../client/group/ClientSessionGroupMapping.java | 26 +-
.../core/protocol/tcp/client/task/HelloTask.java | 9 +-
.../protocol/tcp/client/task/RecommendTask.java | 8 +-
.../eventmesh/client/tcp/common/MessageUtils.java | 6 +-
.../client/tcp/common/EventMeshTestUtils.java | 6 +-
9 files changed, 213 insertions(+), 235 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
index 077ae9a..9e1d98b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
@@ -36,8 +36,7 @@ public class UserAgent {
private String username;
private String password;
private String idc;
- private String producerGroup;
- private String consumerGroup;
+ private String group;
private String purpose;
@Builder.Default
private int unack = 0;
@@ -46,8 +45,7 @@ public class UserAgent {
}
public UserAgent(String env, String subsystem, String path, int pid, String host, int port, String version,
- String username, String password, String idc, String producerGroup, String consumerGroup,
- String purpose, int unack) {
+ String username, String password, String idc, String group, String purpose, int unack) {
this.env = env;
this.subsystem = subsystem;
this.path = path;
@@ -58,8 +56,7 @@ public class UserAgent {
this.username = username;
this.password = password;
this.idc = idc;
- this.producerGroup = producerGroup;
- this.consumerGroup = consumerGroup;
+ this.group = group;
this.purpose = purpose;
this.unack = unack;
}
@@ -67,8 +64,9 @@ public class UserAgent {
@Override
public String toString() {
return String.format(
- "UserAgent{env='%s', subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
- env, subsystem, path, pid, host, port, version, idc, purpose, unack);
+ "UserAgent{env='%s', subsystem='%s', group='%s', path='%s', pid=%d, host='%s',"
+ + " port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
+ env, subsystem, group, path, pid, host, port, version, idc, purpose, unack);
}
@Override
@@ -99,6 +97,10 @@ public class UserAgent {
return false;
}
+ if (!Objects.equals(group, userAgent.group)) {
+ return false;
+ }
+
if (!Objects.equals(path, userAgent.path)) {
return false;
@@ -134,6 +136,7 @@ public class UserAgent {
@Override
public int hashCode() {
int result = subsystem != null ? subsystem.hashCode() : 0;
+ result = 31 * result + (group != null ? group.hashCode() : 0);
result = 31 * result + (path != null ? path.hashCode() : 0);
result = 31 * result + pid;
result = 31 * result + (host != null ? host.hashCode() : 0);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index f7da239..dbb2729 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -51,8 +51,7 @@ public class EventMeshTestUtils {
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
- .producerGroup("EventmeshTest-ProducerGroup")
- .consumerGroup("EventmeshTest-ConsumerGroup")
+ .group("EventmeshTestGroup")
.path("/data/app/umg_proxy")
.port(8362)
.subsystem("5023")
@@ -70,8 +69,7 @@ public class EventMeshTestUtils {
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
- .producerGroup("EventmeshTest-ProducerGroup")
- .consumerGroup("EventmeshTest-ConsumerGroup")
+ .group("EventmeshTestGroup")
.path("/data/app/umg_proxy")
.port(9362)
.subsystem("5017")
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
index 95b3272..a7b11be 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
@@ -67,7 +67,7 @@ public class ShowListenClientByTopicHandler implements HttpHandler {
for (ClientGroupWrapper cgw : clientGroupMap.values()) {
Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
if (listenSessionSet != null && listenSessionSet.size() > 0) {
- result += String.format("group:%s", cgw.getConsumerGroup()) + newLine;
+ result += String.format("group:%s", cgw.getGroup()) + newLine;
for (Session session : listenSessionSet) {
UserAgent userAgent = session.getClient();
result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 091f510..7d45219 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -69,13 +69,10 @@ public class ClientGroupWrapper {
public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
- private String producerGroup;
-
- private String consumerGroup;
-
- //can be sysid + ext(eg dcn)
private String sysId;
+ private String group;
+
private EventMeshTCPConfiguration eventMeshTCPConfiguration;
private EventMeshTCPServer eventMeshTCPServer;
@@ -105,29 +102,29 @@ public class ClientGroupWrapper {
private MQConsumerWrapper broadCastMsgConsumer;
private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
- new ConcurrentHashMap<String, Set<Session>>();
+ new ConcurrentHashMap<String, Set<Session>>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
private MQProducerWrapper mqProducerWrapper;
- public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
+ public ClientGroupWrapper(String sysId, String group,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.sysId = sysId;
- this.producerGroup = producerGroup;
- this.consumerGroup = consumerGroup;
+ this.group = group;
this.eventMeshTCPServer = eventMeshTCPServer;
this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
- this.eventMeshTcpMonitor = Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
+ this.eventMeshTcpMonitor =
+ Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
this.persistentMsgConsumer = new MQConsumerWrapper(
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
this.broadCastMsgConsumer = new MQConsumerWrapper(
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
this.mqProducerWrapper = new MQProducerWrapper(
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
}
public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -149,13 +146,14 @@ public class ClientGroupWrapper {
}
public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback)
- throws Exception {
+ throws Exception {
mqProducerWrapper.send(upStreamMsgContext.getEvent(), sendCallback);
return true;
}
- public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback rrCallback, long timeout)
- throws Exception {
+ public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback rrCallback,
+ long timeout)
+ throws Exception {
mqProducerWrapper.request(upStreamMsgContext.getEvent(), rrCallback, timeout);
}
@@ -168,10 +166,11 @@ public class ClientGroupWrapper {
@Override
public void onException(OnExceptionContext context) {
- String bizSeqNo = (String) upStreamMsgContext.getEvent().getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
+ String bizSeqNo = (String) upStreamMsgContext.getEvent()
+ .getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
logger.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
- upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
- upStreamMsgContext.getSession().getClient(), context.getException());
+ upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
+ upStreamMsgContext.getSession().getClient(), context.getException());
}
});
return true;
@@ -182,8 +181,8 @@ public class ClientGroupWrapper {
}
public boolean addSubscription(String topic, Session session) throws Exception {
- if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ if (session == null || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("addSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -197,16 +196,16 @@ public class ClientGroupWrapper {
}
r = topic2sessionInGroupMapping.get(topic).add(session);
if (r) {
- logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup,
- topic, session.getClient());
+ logger.info("addSubscription success, group:{} topic:{} client:{}", group,
+ topic, session.getClient());
} else {
logger
- .warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic,
- session.getClient());
+ .warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
+ session.getClient());
}
} catch (Exception e) {
logger
- .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+ .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
throw new Exception("addSubscription fail");
} finally {
this.groupLock.writeLock().unlock();
@@ -216,8 +215,8 @@ public class ClientGroupWrapper {
public boolean removeSubscription(String topic, Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("removeSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -229,22 +228,22 @@ public class ClientGroupWrapper {
r = topic2sessionInGroupMapping.get(topic).remove(session);
if (r) {
logger.info(
- "removeSubscription remove session success, group:{} topic:{} client:{}",
- consumerGroup, topic, session.getClient());
+ "removeSubscription remove session success, group:{} topic:{} client:{}",
+ group, topic, session.getClient());
} else {
logger.warn(
- "removeSubscription remove session failed, group:{} topic:{} client:{}",
- consumerGroup, topic, session.getClient());
+ "removeSubscription remove session failed, group:{} topic:{} client:{}",
+ group, topic, session.getClient());
}
}
if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
topic2sessionInGroupMapping.remove(topic);
logger.info("removeSubscription remove topic success, group:{} topic:{}",
- consumerGroup, topic);
+ group, topic);
}
} catch (Exception e) {
logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
- e);
+ e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -257,9 +256,9 @@ public class ClientGroupWrapper {
}
Properties keyValue = new Properties();
- keyValue.put("producerGroup", producerGroup);
+ keyValue.put("producerGroup", group);
keyValue.put("instanceName", EventMeshUtil
- .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
+ .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
@@ -267,7 +266,7 @@ public class ClientGroupWrapper {
mqProducerWrapper.init(keyValue);
mqProducerWrapper.start();
producerStarted.compareAndSet(false, true);
- logger.info("starting producer success, group:{}", producerGroup);
+ logger.info("starting producer success, group:{}", group);
}
public synchronized void shutdownProducer() throws Exception {
@@ -276,29 +275,21 @@ public class ClientGroupWrapper {
}
mqProducerWrapper.shutdown();
producerStarted.compareAndSet(true, false);
- logger.info("shutdown producer success for group:{}", producerGroup);
- }
-
- public String getProducerGroup() {
- return producerGroup;
- }
-
- public void setProducerGroup(String producerGroup) {
- this.producerGroup = producerGroup;
+ logger.info("shutdown producer success for group:{}", group);
}
- public String getConsumerGroup() {
- return consumerGroup;
+ public String getGroup() {
+ return group;
}
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
+ public void setGroup(String group) {
+ this.group = group;
}
public boolean addGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("addGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -308,12 +299,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.add(session);
if (r) {
- logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup,
- session.getClient());
+ logger.info("addGroupConsumerSession success, group:{} client:{}", group,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup,
- session.getClient(), e);
+ logger.error("addGroupConsumerSession error! group:{} client:{}", group,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -322,8 +313,8 @@ public class ClientGroupWrapper {
public boolean addGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(producerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("addGroupProducerSession param error,session:{}", session);
return false;
}
@@ -333,12 +324,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.add(session);
if (r) {
- logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup,
- session.getClient());
+ logger.info("addGroupProducerSession success, group:{} client:{}", group,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup,
- session.getClient(), e);
+ logger.error("addGroupProducerSession error! group:{} client:{}", group,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -347,8 +338,8 @@ public class ClientGroupWrapper {
public boolean removeGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("removeGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -358,12 +349,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.remove(session);
if (r) {
- logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup,
- session.getClient());
+ logger.info("removeGroupConsumerSession success, group:{} client:{}", group,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup,
- session.getClient(), e);
+ logger.error("removeGroupConsumerSession error! group:{} client:{}", group,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -372,8 +363,8 @@ public class ClientGroupWrapper {
public boolean removeGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(producerGroup,
- EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(group,
+ EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("removeGroupProducerSession param error,session:{}", session);
return false;
}
@@ -383,12 +374,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.remove(session);
if (r) {
- logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup,
- session.getClient());
+ logger.info("removeGroupProducerSession success, group:{} client:{}", group,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup,
- session.getClient(), e);
+ logger.error("removeGroupProducerSession error! group:{} client:{}", group,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -403,15 +394,15 @@ public class ClientGroupWrapper {
Properties keyValue = new Properties();
keyValue.put("isBroadcast", "false");
- keyValue.put("consumerGroup", consumerGroup);
+ keyValue.put("consumerGroup", group);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil
- .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
persistentMsgConsumer.init(keyValue);
inited4Persistent.compareAndSet(false, true);
- logger.info("init persistentMsgConsumer success, group:{}", consumerGroup);
+ logger.info("init persistentMsgConsumer success, group:{}", group);
}
public synchronized void startClientGroupPersistentConsumer() throws Exception {
@@ -420,7 +411,7 @@ public class ClientGroupWrapper {
}
persistentMsgConsumer.start();
started4Persistent.compareAndSet(false, true);
- logger.info("starting persistentMsgConsumer success, group:{}", consumerGroup);
+ logger.info("starting persistentMsgConsumer success, group:{}", group);
}
public synchronized void initClientGroupBroadcastConsumer() throws Exception {
@@ -430,14 +421,14 @@ public class ClientGroupWrapper {
Properties keyValue = new Properties();
keyValue.put("isBroadcast", "true");
- keyValue.put("consumerGroup", consumerGroup);
+ keyValue.put("consumerGroup", group);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil
- .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
inited4Broadcast.compareAndSet(false, true);
- logger.info("init broadCastMsgConsumer success, group:{}", consumerGroup);
+ logger.info("init broadCastMsgConsumer success, group:{}", group);
}
public synchronized void startClientGroupBroadcastConsumer() throws Exception {
@@ -446,122 +437,123 @@ public class ClientGroupWrapper {
}
broadCastMsgConsumer.start();
started4Broadcast.compareAndSet(false, true);
- logger.info("starting broadCastMsgConsumer success, group:{}", consumerGroup);
+ logger.info("starting broadCastMsgConsumer success, group:{}", group);
}
public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
EventListener listener = null;
if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
- listener = new EventListener() {
- @Override
- public void consume(CloudEvent event, AsyncConsumeContext context) {
-
- eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
- String topic = event.getSubject();
- // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- // String.valueOf(System.currentTimeMillis()));
- //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- // eventMeshTCPConfiguration.eventMeshServerIp);
-
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
- if (CollectionUtils.isEmpty(groupConsumerSessions)) {
- logger.warn("found no session to downstream broadcast msg");
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- return;
- }
+ listener = (event, context) -> {
- Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
+ eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+ .incrementAndGet();
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
+ String topic = event.getSubject();
+ // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ // String.valueOf(System.currentTimeMillis()));
+ //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ // eventMeshTCPConfiguration.eventMeshServerIp);
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+ (EventMeshAsyncConsumeContext) context;
+ if (CollectionUtils.isEmpty(groupConsumerSessions)) {
+ logger.warn("found no session to downstream broadcast msg");
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ return;
+ }
- DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(event, null, broadCastMsgConsumer,
- eventMeshAsyncConsumeContext.getAbstractContext(), false,
- subscriptionItem);
+ Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
- while (sessionsItr.hasNext()) {
- Session session = sessionsItr.next();
+ DownStreamMsgContext downStreamMsgContext =
+ new DownStreamMsgContext(event, null, broadCastMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
- if (!session.isAvailable(topic)) {
- logger
- .warn("downstream broadcast msg,session is not available,client:{}",
- session.getClient());
- continue;
- }
+ while (sessionsItr.hasNext()) {
+ Session session = sessionsItr.next();
- downStreamMsgContext.session = session;
-
- //downstream broadcast msg asynchronously
- eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
- .submit(new Runnable() {
- @Override
- public void run() {
- //msg put in eventmesh,waiting client ack
- session.getPusher()
- .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
- session.downstreamMsg(downStreamMsgContext);
- }
- });
+ if (!session.isAvailable(topic)) {
+ logger
+ .warn("downstream broadcast msg,session is not available,client:{}",
+ session.getClient());
+ continue;
}
- eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+ downStreamMsgContext.session = session;
+
+ //downstream broadcast msg asynchronously
+ eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+ .submit(new Runnable() {
+ @Override
+ public void run() {
+ //msg put in eventmesh,waiting client ack
+ session.getPusher()
+ .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+ session.downstreamMsg(downStreamMsgContext);
+ }
+ });
}
+
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
};
broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
} else {
listener = (event, context) -> {
- eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
+ eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+ .incrementAndGet();
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
String topic = event.getSubject();
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
+ (EventMeshAsyncConsumeContext) context;
Session session = downstreamDispatchStrategy
- .select(consumerGroup, topic, groupConsumerSessions);
+ .select(group, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
if (session == null) {
try {
Integer sendBackTimes = 0;
String sendBackFromEventMeshIp = "";
if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
- sendBackTimes = (Integer) event.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
+ sendBackTimes = (Integer) event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
}
if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
- sendBackFromEventMeshIp = (String) event.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+ EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
+ sendBackFromEventMeshIp = (String) event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}
logger.error(
- "found no session to downstream msg,groupName:{}, topic:{}, "
- + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
- consumerGroup, topic, bizSeqNo, sendBackTimes,
- sendBackFromEventMeshIp);
+ "found no session to downstream msg,groupName:{}, topic:{}, "
+ + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+ group, topic, bizSeqNo, sendBackTimes,
+ sendBackFromEventMeshIp);
if (sendBackTimes >= eventMeshTCPServer
- .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+ .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
logger.error(
- "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
- + "bizSeqNo:{}", eventMeshTCPServer
- .getEventMeshTCPConfiguration()
- .eventMeshTcpSendBackMaxTimes,
- consumerGroup, topic, bizSeqNo);
+ "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+ + "bizSeqNo:{}", eventMeshTCPServer
+ .getEventMeshTCPConfiguration()
+ .eventMeshTcpSendBackMaxTimes,
+ group, topic, bizSeqNo);
} else {
sendBackTimes++;
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
- sendBackTimes.toString())
- .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
+ .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+ sendBackTimes.toString())
+ .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
sendMsgBackToBroker(event, bizSeqNo);
}
} catch (Exception e) {
@@ -573,9 +565,9 @@ public class ClientGroupWrapper {
}
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(event, session, persistentMsgConsumer,
- eventMeshAsyncConsumeContext.getAbstractContext(), false,
- subscriptionItem);
+ new DownStreamMsgContext(event, session, persistentMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
@@ -596,7 +588,7 @@ public class ClientGroupWrapper {
public synchronized void shutdownBroadCastConsumer() throws Exception {
if (started4Broadcast.get()) {
broadCastMsgConsumer.shutdown();
- logger.info("broadcast consumer group:{} shutdown...", consumerGroup);
+ logger.info("broadcast consumer group:{} shutdown...", group);
}
started4Broadcast.compareAndSet(true, false);
inited4Broadcast.compareAndSet(true, false);
@@ -607,7 +599,7 @@ public class ClientGroupWrapper {
if (started4Persistent.get()) {
persistentMsgConsumer.shutdown();
- logger.info("persistent consumer group:{} shutdown...", consumerGroup);
+ logger.info("persistent consumer group:{} shutdown...", group);
}
started4Persistent.compareAndSet(true, false);
inited4Persistent.compareAndSet(true, false);
@@ -651,7 +643,7 @@ public class ClientGroupWrapper {
}
public void setDownstreamDispatchStrategy(
- DownstreamDispatchStrategy downstreamDispatchStrategy) {
+ DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
}
@@ -662,24 +654,24 @@ public class ClientGroupWrapper {
private String pushMsgToEventMesh(CloudEvent msg, String ip, int port) throws Exception {
StringBuilder targetUrl = new StringBuilder();
targetUrl.append("http://").append(ip).append(":").append(port)
- .append("/eventMesh/msg/push");
+ .append("/eventMesh/msg/push");
HttpTinyClient.HttpResult result = null;
try {
logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl,
- msg);
+ msg);
List<String> paramValues = new ArrayList<String>();
paramValues.add("msg");
paramValues.add(JsonUtils.serialize(msg));
paramValues.add("group");
- paramValues.add(consumerGroup);
+ paramValues.add(group);
result = HttpTinyClient.httpPost(
- targetUrl.toString(),
- null,
- paramValues,
- "UTF-8",
- 3000);
+ targetUrl.toString(),
+ null,
+ paramValues,
+ "UTF-8",
+ 3000);
} catch (Exception e) {
logger.error("httpPost " + targetUrl + " is fail,", e);
//throw new RuntimeException("httpPost " + targetUrl + " is fail," , e);
@@ -691,7 +683,7 @@ public class ClientGroupWrapper {
} else {
throw new Exception("httpPost targetUrl[" + targetUrl
- + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
+ + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
}
}
@@ -707,22 +699,22 @@ public class ClientGroupWrapper {
long startTime = System.currentTimeMillis();
long taskExcuteTime = startTime;
send(new UpStreamMsgContext(null, event, null, startTime, taskExcuteTime),
- new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- logger.info(
- "consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, "
- + "topic:{}", consumerGroup, bizSeqNo, topic);
- }
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ logger.info(
+ "group:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+ + "topic:{}", group, bizSeqNo, topic);
+ }
- @Override
- public void onException(OnExceptionContext context) {
- logger.warn(
- "consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{},"
- + " topic:{}", consumerGroup, bizSeqNo, topic);
- }
+ @Override
+ public void onException(OnExceptionContext context) {
+ logger.warn(
+ "group:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+ + " topic:{}", group, bizSeqNo, topic);
+ }
- });
+ });
eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
} catch (Exception e) {
logger.warn("try send msg back to broker failed");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 548d7e5..003186e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -179,29 +179,29 @@ public class ClientSessionGroupMapping {
}
}
- private ClientGroupWrapper constructClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
+ private ClientGroupWrapper constructClientGroupWrapper(String sysId, String group,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
- return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer,
+ return new ClientGroupWrapper(sysId, group, eventMeshTCPServer,
downstreamDispatchStrategy);
}
private void initClientGroupWrapper(UserAgent user, Session session) throws Exception {
- if (!lockMap.containsKey(user.getSubsystem())) {
- Object obj = lockMap.putIfAbsent(user.getSubsystem(), new Object());
+ if (!lockMap.containsKey(user.getGroup())) {
+ Object obj = lockMap.putIfAbsent(user.getGroup(), new Object());
if (obj == null) {
- logger.info("add lock to map for subsystem:{}", user.getSubsystem());
+ logger.info("add lock to map for group:{}", user.getGroup());
}
}
- synchronized (lockMap.get(user.getSubsystem())) {
- if (!clientGroupMap.containsKey(user.getSubsystem())) {
- ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getProducerGroup(),
- user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy());
- clientGroupMap.put(user.getSubsystem(), cgw);
- logger.info("create new ClientGroupWrapper, subsystem:{}", user.getSubsystem());
+ synchronized (lockMap.get(user.getGroup())) {
+ if (!clientGroupMap.containsKey(user.getGroup())) {
+ ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getGroup(),
+ eventMeshTCPServer, new FreePriorityDispatchStrategy());
+ clientGroupMap.put(user.getGroup(), cgw);
+ logger.info("create new ClientGroupWrapper, group:{}", user.getGroup());
}
- ClientGroupWrapper cgw = clientGroupMap.get(user.getSubsystem());
+ ClientGroupWrapper cgw = clientGroupMap.get(user.getGroup());
if (EventMeshConstants.PURPOSE_PUB.equals(user.getPurpose())) {
startClientGroupProducer(cgw, session);
@@ -307,7 +307,7 @@ public class ClientSessionGroupMapping {
continue;
}
Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy()
- .select(session.getClientGroupWrapper().get().getConsumerGroup(),
+ .select(session.getClientGroupWrapper().get().getGroup(),
downStreamMsgContext.event.getSubject(),
Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions);
if (reChooseSession != null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
index 4d25335..12ca6ff 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
@@ -115,14 +115,9 @@ public class HelloTask extends AbstractTask {
throw new Exception("client purpose config is error");
}
- if (StringUtils.equals(EventMeshConstants.PURPOSE_PUB, user.getPurpose())
- && StringUtils.isBlank(user.getProducerGroup())) {
- throw new Exception("client producerGroup cannot be null");
+ if (StringUtils.isBlank(user.getGroup())) {
+ throw new Exception("client group cannot be null");
}
- if (StringUtils.equals(EventMeshConstants.PURPOSE_SUB, user.getPurpose())
- && StringUtils.isBlank(user.getConsumerGroup())) {
- throw new Exception("client consumerGroup cannot be null");
- }
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
index da595ba..6499e8e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
@@ -97,12 +97,6 @@ public class RecommendTask extends AbstractTask {
if (userAgent == null) {
return null;
}
- if (EventMeshConstants.PURPOSE_PUB.equals(userAgent.getPurpose())) {
- return userAgent.getProducerGroup();
- } else if (EventMeshConstants.PURPOSE_SUB.equals(userAgent.getPurpose())) {
- return userAgent.getConsumerGroup();
- } else {
- return null;
- }
+ return userAgent.getGroup();
}
}
\ No newline at end of file
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 96b6ad7..e006fa8 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -150,8 +150,7 @@ public class MessageUtils {
.pid(agent.getPid())
.version(agent.getVersion())
.idc(agent.getIdc())
- .consumerGroup(agent.getConsumerGroup())
- .producerGroup(agent.getProducerGroup())
+ .group(agent.getGroup())
.purpose(EventMeshCommon.USER_AGENT_PURPOSE_SUB)
.build();
}
@@ -168,8 +167,7 @@ public class MessageUtils {
.pid(agent.getPid())
.version(agent.getVersion())
.idc(agent.getIdc())
- .consumerGroup(agent.getConsumerGroup())
- .producerGroup(agent.getProducerGroup())
+ .group(agent.getGroup())
.purpose(EventMeshCommon.USER_AGENT_PURPOSE_PUB)
.build();
}
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
index 94d273b..42db31b 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
@@ -38,8 +38,7 @@ public class EventMeshTestUtils {
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
- .consumerGroup("EventmeshTest-ConsumerGroup")
- .producerGroup("EventmeshTest-ProducerGroup")
+ .group("EventmeshTestGroup")
.path("/data/app/umg_proxy")
.port(8362)
.subsystem("5023")
@@ -54,8 +53,7 @@ public class EventMeshTestUtils {
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
- .consumerGroup("EventmeshTest-ConsumerGroup")
- .producerGroup("EventmeshTest-ProducerGroup")
+ .group("EventmeshTestGroup")
.path("/data/app/umg_proxy")
.port(9362)
.subsystem("5017")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org