You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/23 07:59:46 UTC
[incubator-eventmesh] branch master updated: [Issue #780] Modify the define level of EventListener from Topic to Consumer (#781)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 6f40eb9 [Issue #780] Modify the define level of EventListener from Topic to Consumer (#781)
6f40eb9 is described below
commit 6f40eb928e075a18a050bdff5e6cfe054c855e80
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Wed Feb 23 15:59:36 2022 +0800
[Issue #780] Modify the define level of EventListener from Topic to Consumer (#781)
* modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field
* modify: fix checksyle error
* modify: fix checksyle error in ClientGroupWrapper.java
* modify: move EventListner in the level of Consumer instead of binding with topic in EventMesh
* modify: fix the eventListener level problem in grpc protocal
* modify: fix the eventListener problem in test case
close #780
---
.../apache/eventmesh/api/consumer/Consumer.java | 6 +-
.../rocketmq/consumer/PushConsumerImpl.java | 23 +-
.../rocketmq/consumer/RocketMQConsumerImpl.java | 9 +-
.../rocketmq/consumer/PushConsumerImplTest.java | 10 +-
.../standalone/consumer/StandaloneConsumer.java | 13 +-
.../consumer/StandaloneConsumerAdaptor.java | 99 +------
.../runtime/core/plugin/MQConsumerWrapper.java | 10 +-
.../protocol/grpc/consumer/EventMeshConsumer.java | 8 +-
.../protocol/http/consumer/EventMeshConsumer.java | 254 +++++++++---------
.../tcp/client/group/ClientGroupWrapper.java | 286 +++++++++++----------
.../client/group/ClientSessionGroupMapping.java | 2 +-
.../core/protocol/tcp/client/session/Session.java | 4 +-
12 files changed, 334 insertions(+), 390 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
index 87b3ac7..52b1a93 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
@@ -38,7 +38,11 @@ public interface Consumer extends LifeCycle {
void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
- void subscribe(String topic, final EventListener listener) throws Exception;
+ //void subscribe(String topic, final EventListener listener) throws Exception;
+
+ void subscribe(String topic) throws Exception;
void unsubscribe(String topic);
+
+ void registerEventListener(EventListener listener);
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index f6d6083..71f921f 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -59,7 +59,7 @@ public class PushConsumerImpl {
private final DefaultMQPushConsumer rocketmqPushConsumer;
private final Properties properties;
private AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, EventListener> subscribeTable = new ConcurrentHashMap<>();
+ private EventListener eventListener;
private final ClientConfig clientConfig;
public PushConsumerImpl(final Properties properties) {
@@ -134,9 +134,7 @@ public class PushConsumerImpl {
return rocketmqPushConsumer;
}
-
- public void subscribe(String topic, String subExpression, EventListener listener) {
- this.subscribeTable.put(topic, listener);
+ public void subscribe(String topic, String subExpression) {
try {
this.rocketmqPushConsumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
@@ -146,7 +144,6 @@ public class PushConsumerImpl {
public void unsubscribe(String topic) {
- this.subscribeTable.remove(topic);
try {
this.rocketmqPushConsumer.unsubscribe(topic);
} catch (Exception e) {
@@ -197,9 +194,7 @@ public class PushConsumerImpl {
cloudEvent = cloudEventBuilder.build();
}
- EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());
-
- if (listener == null) {
+ if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
@@ -231,7 +226,7 @@ public class PushConsumerImpl {
eventMeshAsyncConsumeContext.setAbstractContext(context);
- listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+ eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
@@ -270,9 +265,7 @@ public class PushConsumerImpl {
cloudEvent = cloudEventBuilder.build();
}
- EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());
-
- if (listener == null) {
+ if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
@@ -306,12 +299,14 @@ public class PushConsumerImpl {
eventMeshAsyncConsumeContext.setAbstractContext(context);
- listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+ eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}
-
+ public void registerEventListener(EventListener listener) {
+ this.eventListener = listener;
+ }
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index 12fc214..eaf2a42 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -70,8 +70,8 @@ public class RocketMQConsumerImpl implements Consumer {
}
@Override
- public void subscribe(String topic, EventListener listener) throws Exception {
- pushConsumer.subscribe(topic, "*", listener);
+ public void subscribe(String topic) throws Exception {
+ pushConsumer.subscribe(topic, "*");
}
@Override
@@ -100,6 +100,11 @@ public class RocketMQConsumerImpl implements Consumer {
}
@Override
+ public void registerEventListener(EventListener listener) {
+ pushConsumer.registerEventListener(listener);
+ }
+
+ @Override
public synchronized void shutdown() {
pushConsumer.shutdown();
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 31bcd5f..683183b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -89,15 +89,7 @@ public class PushConsumerImplTest {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
- consumer.subscribe("HELLO_QUEUE", "*", new EventListener() {
-
- @Override
- public void consume(CloudEvent cloudEvent, org.apache.eventmesh.api.AsyncConsumeContext context) {
- assertThat(cloudEvent.getExtension("MESSAGE_ID")).isEqualTo("NewMsgId");
- assertThat(cloudEvent.getData()).isEqualTo(testBody);
- context.commit(EventMeshAction.CommitMessage);
- }
- });
+ consumer.subscribe("HELLO_QUEUE", "*");
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
index 225b208..f56fc2b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
@@ -37,6 +37,8 @@ public class StandaloneConsumer implements Consumer {
private StandaloneBroker standaloneBroker;
+ private EventListener listener;
+
private AtomicBoolean isStarted;
private final ConcurrentHashMap<String, SubScribeTask> subscribeTaskTable;
@@ -90,10 +92,8 @@ public class StandaloneConsumer implements Consumer {
}
@Override
- public void subscribe(String topic, EventListener listener) throws Exception {
- if (listener == null) {
- throw new IllegalArgumentException("listener cannot be null");
- }
+ public void subscribe(String topic) throws Exception {
+
if (subscribeTaskTable.containsKey(topic)) {
return;
}
@@ -116,4 +116,9 @@ public class StandaloneConsumer implements Consumer {
subscribeTaskTable.remove(topic);
}
}
+
+ @Override
+ public void registerEventListener(EventListener listener) {
+ this.listener = listener;
+ }
}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
index c09d422..d7d7257 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
@@ -69,8 +69,8 @@ public class StandaloneConsumerAdaptor implements Consumer {
}
@Override
- public void subscribe(String topic, EventListener listener) throws Exception {
- consumer.subscribe(topic, listener);
+ public void subscribe(String topic) throws Exception {
+ consumer.subscribe(topic);
}
@Override
@@ -78,95 +78,8 @@ public class StandaloneConsumerAdaptor implements Consumer {
consumer.unsubscribe(topic);
}
- //@Override
- //public void init(Properties keyValue) throws Exception {
- // String producerGroup = keyValue.getProperty("producerGroup");
- //
- // MessagingAccessPointImpl messagingAccessPoint = new MessagingAccessPointImpl(keyValue);
- // consumer = (StandaloneConsumer) messagingAccessPoint.createConsumer(keyValue);
- //
- //}
- //
- //@Override
- //public void updateOffset(List<Message> msgs, AbstractContext context) {
- // for(Message message : msgs) {
- // consumer.updateOffset(message);
- // }
- //}
- //
- //@Override
- //public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
- // // todo: support subExpression
- // consumer.subscribe(topic, "*", listener);
- //}
- //
- //@Override
- //public void unsubscribe(String topic) {
- // consumer.unsubscribe(topic);
- //}
- //
- //@Override
- //public void subscribe(String topic, String subExpression, MessageListener listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public void subscribe(String topic, MessageSelector selector, MessageListener listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public void subscribe(String topic, String subExpression, AsyncMessageListener listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
- // throw new UnsupportedOperationException("not supported yet");
- //}
- //
- //@Override
- //public void updateCredential(Properties credentialProperties) {
- //
- //}
- //
- //@Override
- //public boolean isStarted() {
- // return consumer.isStarted();
- //}
- //
- //@Override
- //public boolean isClosed() {
- // return consumer.isClosed();
- //}
- //
- //@Override
- //public void start() {
- // consumer.start();
- //}
- //
- //@Override
- //public void shutdown() {
- // consumer.shutdown();
- //}
+ @Override
+ public void registerEventListener(EventListener listener) {
+ consumer.registerEventListener(listener);
+ }
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index edce54a..6bd8875 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -44,8 +44,8 @@ public class MQConsumerWrapper extends MQWrapper {
}
}
- public void subscribe(String topic, EventListener listener) throws Exception {
- meshMQPushConsumer.subscribe(topic, listener);
+ public void subscribe(String topic) throws Exception {
+ meshMQPushConsumer.subscribe(topic);
}
public void unsubscribe(String topic) throws Exception {
@@ -69,9 +69,9 @@ public class MQConsumerWrapper extends MQWrapper {
started.compareAndSet(false, true);
}
- //public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) {
- // meshMQPushConsumer.registerMessageListener(messageListenerConcurrently);
- //}
+ public void registerEventListener(EventListener listener) {
+ meshMQPushConsumer.registerEventListener(listener);
+ }
public void updateOffset(List<CloudEvent> events, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index 6bf6962..6324619 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -140,6 +140,8 @@ public class EventMeshConsumer {
keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
persistentMqConsumer.init(keyValue);
+ EventListener clusterEventListner = createEventListener(SubscriptionMode.CLUSTERING);
+ persistentMqConsumer.registerEventListener(clusterEventListner);
Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put("isBroadcast", "true");
@@ -148,6 +150,8 @@ public class EventMeshConsumer {
broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
broadcastMqConsumer.init(broadcastKeyValue);
+ EventListener broadcastEventListner = createEventListener(SubscriptionMode.BROADCASTING);
+ broadcastMqConsumer.registerEventListener(broadcastEventListner);
serviceState = ServiceState.INITED;
logger.info("EventMeshConsumer [{}] initialized.............", consumerGroup);
@@ -184,9 +188,9 @@ public class EventMeshConsumer {
public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
if (SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
- persistentMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
+ persistentMqConsumer.subscribe(topic);
} else if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
- broadcastMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
+ broadcastMqConsumer.subscribe(topic);
} else {
logger.error("Subscribe Failed. Incorrect Subscription Mode");
throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 96398d4..b4e5386 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -92,7 +92,69 @@ public class EventMeshConsumer {
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
persistentMqConsumer.init(keyValue);
- //
+ EventListener cluserEventListener = new EventListener() {
+ @Override
+ public void consume(CloudEvent event, AsyncConsumeContext context) {
+ String topic = event.getSubject();
+ //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
+ String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
+
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+ .build();
+ //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ if (messageLogger.isDebugEnabled()) {
+ messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
+ } else {
+ messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
+ }
+
+ ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
+ topic, null);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+
+ if (currentTopicConfig == null) {
+ logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
+ try {
+ sendMessageBack(event, uniqueId, bizSeqNo);
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ return;
+ } catch (Exception ex) {
+ //ignore
+ }
+ }
+
+ SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+ HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+ topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+
+ if (httpMessageHandler.handle(handleMsgContext)) {
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+ } else {
+ try {
+ sendMessageBack(event, uniqueId, bizSeqNo);
+ } catch (Exception e) {
+ //ignore
+ }
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ }
+ }
+ };
+ persistentMqConsumer.registerEventListener(cluserEventListener);
+
+ //broacast consumer
Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put("isBroadcast", "true");
broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
@@ -100,6 +162,72 @@ public class EventMeshConsumer {
broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
broadcastMqConsumer.init(broadcastKeyValue);
+
+ EventListener broadcastEventListener = new EventListener() {
+ @Override
+ public void consume(CloudEvent event, AsyncConsumeContext context) {
+
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .build();
+
+ String topic = event.getSubject();
+ String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString();
+ String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString();
+
+ if (messageLogger.isDebugEnabled()) {
+ messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
+ } else {
+ messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo,
+ uniqueId);
+ }
+
+ ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
+ consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+
+ if (currentTopicConfig == null) {
+ logger.error("no topicConfig found, consumerGroup:{} topic:{}",
+ consumerGroupConf.getConsumerGroup(), topic);
+ try {
+ sendMessageBack(event, uniqueId, bizSeqNo);
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ return;
+ } catch (Exception ex) {
+ //ignore
+ }
+ }
+
+ SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+ HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+ topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+
+ if (httpMessageHandler.handle(handleMsgContext)) {
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+ } else {
+ try {
+ sendMessageBack(event, uniqueId, bizSeqNo);
+ } catch (Exception e) {
+ //ignore
+ }
+ //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ //context.ack();
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ }
+ }
+ };
+ broadcastMqConsumer.registerEventListener(broadcastEventListener);
+
inited4Persistent.compareAndSet(false, true);
inited4Broadcast.compareAndSet(false, true);
logger.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
@@ -113,130 +241,10 @@ public class EventMeshConsumer {
}
public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
- EventListener listener = null;
if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
- listener = new EventListener() {
- @Override
- public void consume(CloudEvent event, AsyncConsumeContext context) {
- String topic = event.getSubject();
- //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
- String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
-
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
- .build();
- //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
- if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
- } else {
- messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
- }
-
- ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
- topic, null);
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
-
- if (currentTopicConfig == null) {
- logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
- try {
- sendMessageBack(event, uniqueId, bizSeqNo);
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- return;
- } catch (Exception ex) {
- //ignore
- }
- }
- HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
-
- if (httpMessageHandler.handle(handleMsgContext)) {
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
- } else {
- try {
- sendMessageBack(event, uniqueId, bizSeqNo);
- } catch (Exception e) {
- //ignore
- }
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- }
- }
- };
- persistentMqConsumer.subscribe(topic, listener);
+ persistentMqConsumer.subscribe(topic);
} else {
- listener = new EventListener() {
- @Override
- public void consume(CloudEvent event, AsyncConsumeContext context) {
-
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .build();
-
- String topic = event.getSubject();
- String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString();
- String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString();
-
- if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
- } else {
- messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo,
- uniqueId);
- }
-
- ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
- consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
-
- if (currentTopicConfig == null) {
- logger.error("no topicConfig found, consumerGroup:{} topic:{}",
- consumerGroupConf.getConsumerGroup(), topic);
- try {
- sendMessageBack(event, uniqueId, bizSeqNo);
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- return;
- } catch (Exception ex) {
- //ignore
- }
- }
- HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
-
- if (httpMessageHandler.handle(handleMsgContext)) {
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
- } else {
- try {
- sendMessageBack(event, uniqueId, bizSeqNo);
- } catch (Exception e) {
- //ignore
- }
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- }
- }
- };
- broadcastMqConsumer.subscribe(topic, listener);
+ broadcastMqConsumer.subscribe(topic);
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 7d45219..e6937f1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,7 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
-import org.apache.eventmesh.api.AsyncConsumeContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
@@ -104,6 +103,8 @@ public class ClientGroupWrapper {
private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
new ConcurrentHashMap<String, Set<Session>>();
+ private ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
+
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
private MQProducerWrapper mqProducerWrapper;
@@ -180,7 +181,12 @@ public class ClientGroupWrapper {
return mqProducerWrapper;
}
- public boolean addSubscription(String topic, Session session) throws Exception {
+ public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) throws Exception {
+ if (subscriptionItem == null) {
+ logger.error("addSubscription param error,subscriptionItem is null", session);
+ return false;
+ }
+ String topic = subscriptionItem.getTopic();
if (session == null || !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
logger.error("addSubscription param error,topic:{},session:{}", topic, session);
@@ -203,6 +209,8 @@ public class ClientGroupWrapper {
.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
session.getClient());
}
+
+ subscriptions.putIfAbsent(topic, subscriptionItem);
} catch (Exception e) {
logger
.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
@@ -213,7 +221,12 @@ public class ClientGroupWrapper {
return r;
}
- public boolean removeSubscription(String topic, Session session) {
+ public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
+ if (subscriptionItem == null) {
+ logger.error("addSubscription param error,subscriptionItem is null", session);
+ return false;
+ }
+ String topic = subscriptionItem.getTopic();
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
@@ -238,6 +251,7 @@ public class ClientGroupWrapper {
}
if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
topic2sessionInGroupMapping.remove(topic);
+ subscriptions.remove(topic);
logger.info("removeSubscription remove topic success, group:{} topic:{}",
group, topic);
}
@@ -401,6 +415,79 @@ public class ClientGroupWrapper {
persistentMsgConsumer.init(keyValue);
+ EventListener listener = (event, context) -> {
+ eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+ .incrementAndGet();
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
+ String topic = event.getSubject();
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+ (EventMeshAsyncConsumeContext) context;
+ Session session = downstreamDispatchStrategy
+ .select(group, topic, groupConsumerSessions);
+ String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
+ if (session == null) {
+ try {
+ Integer sendBackTimes = 0;
+ String sendBackFromEventMeshIp = "";
+ if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
+ sendBackTimes = (Integer) event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
+ }
+ if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
+ sendBackFromEventMeshIp = (String) event.getExtension(
+ EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+ }
+
+ logger.error(
+ "found no session to downstream msg,groupName:{}, topic:{}, "
+ + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+ group, topic, bizSeqNo, sendBackTimes,
+ sendBackFromEventMeshIp);
+
+ if (sendBackTimes >= eventMeshTCPServer
+ .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+ logger.error(
+ "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+ + "bizSeqNo:{}", eventMeshTCPServer
+ .getEventMeshTCPConfiguration()
+ .eventMeshTcpSendBackMaxTimes,
+ group, topic, bizSeqNo);
+ } else {
+ sendBackTimes++;
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+ sendBackTimes.toString())
+ .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
+ sendMsgBackToBroker(event, bizSeqNo);
+ }
+ } catch (Exception e) {
+ logger.warn("handle msg exception when no session found", e);
+ }
+
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ return;
+ }
+
+ SubscriptionItem subscriptionItem = subscriptions.get(topic);
+ DownStreamMsgContext downStreamMsgContext =
+ new DownStreamMsgContext(event, session, persistentMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
+ //msg put in eventmesh,waiting client ack
+ session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+ session.downstreamMsg(downStreamMsgContext);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+ };
+ persistentMsgConsumer.registerEventListener(listener);
+
inited4Persistent.compareAndSet(false, true);
logger.info("init persistentMsgConsumer success, group:{}", group);
}
@@ -427,6 +514,66 @@ public class ClientGroupWrapper {
.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
+ EventListener listener = (event, context) -> {
+
+ eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+ .incrementAndGet();
+ event = CloudEventBuilder.from(event)
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp).build();
+ String topic = event.getSubject();
+ // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ // String.valueOf(System.currentTimeMillis()));
+ //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ // eventMeshTCPConfiguration.eventMeshServerIp);
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+ (EventMeshAsyncConsumeContext) context;
+ if (CollectionUtils.isEmpty(groupConsumerSessions)) {
+ logger.warn("found no session to downstream broadcast msg");
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+ return;
+ }
+
+ Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
+
+ SubscriptionItem subscriptionItem = subscriptions.get(topic);
+ DownStreamMsgContext downStreamMsgContext =
+ new DownStreamMsgContext(event, null, broadCastMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
+
+ while (sessionsItr.hasNext()) {
+ Session session = sessionsItr.next();
+
+ if (!session.isAvailable(topic)) {
+ logger
+ .warn("downstream broadcast msg,session is not available,client:{}",
+ session.getClient());
+ continue;
+ }
+
+ downStreamMsgContext.session = session;
+
+ //downstream broadcast msg asynchronously
+ eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+ .submit(new Runnable() {
+ @Override
+ public void run() {
+ //msg put in eventmesh,waiting client ack
+ session.getPusher()
+ .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+ session.downstreamMsg(downStreamMsgContext);
+ }
+ });
+ }
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+ };
+ broadCastMsgConsumer.registerEventListener(listener);
+
inited4Broadcast.compareAndSet(false, true);
logger.info("init broadCastMsgConsumer success, group:{}", group);
}
@@ -441,139 +588,10 @@ public class ClientGroupWrapper {
}
public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
- EventListener listener = null;
if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
- listener = (event, context) -> {
-
- eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
- .incrementAndGet();
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
- String topic = event.getSubject();
- // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- // String.valueOf(System.currentTimeMillis()));
- //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- // eventMeshTCPConfiguration.eventMeshServerIp);
-
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
- if (CollectionUtils.isEmpty(groupConsumerSessions)) {
- logger.warn("found no session to downstream broadcast msg");
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- return;
- }
-
- Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
-
- DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(event, null, broadCastMsgConsumer,
- eventMeshAsyncConsumeContext.getAbstractContext(), false,
- subscriptionItem);
-
- while (sessionsItr.hasNext()) {
- Session session = sessionsItr.next();
-
- if (!session.isAvailable(topic)) {
- logger
- .warn("downstream broadcast msg,session is not available,client:{}",
- session.getClient());
- continue;
- }
-
- downStreamMsgContext.session = session;
-
- //downstream broadcast msg asynchronously
- eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
- .submit(new Runnable() {
- @Override
- public void run() {
- //msg put in eventmesh,waiting client ack
- session.getPusher()
- .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
- session.downstreamMsg(downStreamMsgContext);
- }
- });
- }
-
- eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
- };
- broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
+ broadCastMsgConsumer.subscribe(subscriptionItem.getTopic());
} else {
- listener = (event, context) -> {
- eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
- .incrementAndGet();
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
- String topic = event.getSubject();
-
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
- Session session = downstreamDispatchStrategy
- .select(group, topic, groupConsumerSessions);
- String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
- if (session == null) {
- try {
- Integer sendBackTimes = 0;
- String sendBackFromEventMeshIp = "";
- if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
- sendBackTimes = (Integer) event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
- }
- if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
- sendBackFromEventMeshIp = (String) event.getExtension(
- EventMeshConstants.EVENTMESH_SEND_BACK_IP);
- }
-
- logger.error(
- "found no session to downstream msg,groupName:{}, topic:{}, "
- + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
- group, topic, bizSeqNo, sendBackTimes,
- sendBackFromEventMeshIp);
-
- if (sendBackTimes >= eventMeshTCPServer
- .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
- logger.error(
- "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
- + "bizSeqNo:{}", eventMeshTCPServer
- .getEventMeshTCPConfiguration()
- .eventMeshTcpSendBackMaxTimes,
- group, topic, bizSeqNo);
- } else {
- sendBackTimes++;
- event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
- sendBackTimes.toString())
- .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
- eventMeshTCPConfiguration.eventMeshServerIp).build();
- sendMsgBackToBroker(event, bizSeqNo);
- }
- } catch (Exception e) {
- logger.warn("handle msg exception when no session found", e);
- }
-
- eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
- return;
- }
-
- DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(event, session, persistentMsgConsumer,
- eventMeshAsyncConsumeContext.getAbstractContext(), false,
- subscriptionItem);
- //msg put in eventmesh,waiting client ack
- session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
- session.downstreamMsg(downStreamMsgContext);
- eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
- };
- persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
+ persistentMsgConsumer.subscribe(subscriptionItem.getTopic());
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 003186e..2ba749f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -283,7 +283,7 @@ public class ClientSessionGroupMapping {
*/
private void cleanSubscriptionInSession(Session session) throws Exception {
for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
- session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session);
+ session.getClientGroupWrapper().get().removeSubscription(item, session);
if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
session.getClientGroupWrapper().get().unsubscribe(item);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 37e7b5e..09b7396 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -171,7 +171,7 @@ public class Session {
clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
- clientGroupWrapper.get().addSubscription(item.getTopic(), this);
+ clientGroupWrapper.get().addSubscription(item, this);
subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
}
}
@@ -179,7 +179,7 @@ public class Session {
public void unsubscribe(List<SubscriptionItem> items) throws Exception {
for (SubscriptionItem item : items) {
sessionContext.subscribeTopics.remove(item.getTopic());
- clientGroupWrapper.get().removeSubscription(item.getTopic(), this);
+ clientGroupWrapper.get().removeSubscription(item, this);
if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) {
clientGroupWrapper.get().unsubscribe(item);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org