You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/23 07:59:46 UTC

[incubator-eventmesh] branch master updated: [Issue #780] Modify the define level of EventListener from Topic to Consumer (#781)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f40eb9  [Issue #780] Modify the define level  of EventListener from Topic to Consumer (#781)
6f40eb9 is described below

commit 6f40eb928e075a18a050bdff5e6cfe054c855e80
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Wed Feb 23 15:59:36 2022 +0800

    [Issue #780] Modify the define level  of EventListener from Topic to Consumer (#781)
    
    * modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field
    
    * modify: fix checksyle error
    
    * modify: fix checksyle error in ClientGroupWrapper.java
    
    * modify: move EventListner in the level of Consumer instead of binding with topic in EventMesh
    
    * modify: fix the eventListener level problem in grpc protocal
    
    * modify: fix the eventListener problem in test case
    
    close #780
---
 .../apache/eventmesh/api/consumer/Consumer.java    |   6 +-
 .../rocketmq/consumer/PushConsumerImpl.java        |  23 +-
 .../rocketmq/consumer/RocketMQConsumerImpl.java    |   9 +-
 .../rocketmq/consumer/PushConsumerImplTest.java    |  10 +-
 .../standalone/consumer/StandaloneConsumer.java    |  13 +-
 .../consumer/StandaloneConsumerAdaptor.java        |  99 +------
 .../runtime/core/plugin/MQConsumerWrapper.java     |  10 +-
 .../protocol/grpc/consumer/EventMeshConsumer.java  |   8 +-
 .../protocol/http/consumer/EventMeshConsumer.java  | 254 +++++++++---------
 .../tcp/client/group/ClientGroupWrapper.java       | 286 +++++++++++----------
 .../client/group/ClientSessionGroupMapping.java    |   2 +-
 .../core/protocol/tcp/client/session/Session.java  |   4 +-
 12 files changed, 334 insertions(+), 390 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
index 87b3ac7..52b1a93 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java
@@ -38,7 +38,11 @@ public interface Consumer extends LifeCycle {
 
     void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
 
-    void subscribe(String topic, final EventListener listener) throws Exception;
+    //void subscribe(String topic, final EventListener listener) throws Exception;
+
+    void subscribe(String topic) throws Exception;
 
     void unsubscribe(String topic);
+
+    void registerEventListener(EventListener listener);
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index f6d6083..71f921f 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -59,7 +59,7 @@ public class PushConsumerImpl {
     private final DefaultMQPushConsumer rocketmqPushConsumer;
     private final Properties properties;
     private AtomicBoolean started = new AtomicBoolean(false);
-    private final Map<String, EventListener> subscribeTable = new ConcurrentHashMap<>();
+    private EventListener eventListener;
     private final ClientConfig clientConfig;
 
     public PushConsumerImpl(final Properties properties) {
@@ -134,9 +134,7 @@ public class PushConsumerImpl {
         return rocketmqPushConsumer;
     }
 
-
-    public void subscribe(String topic, String subExpression, EventListener listener) {
-        this.subscribeTable.put(topic, listener);
+    public void subscribe(String topic, String subExpression) {
         try {
             this.rocketmqPushConsumer.subscribe(topic, subExpression);
         } catch (MQClientException e) {
@@ -146,7 +144,6 @@ public class PushConsumerImpl {
 
 
     public void unsubscribe(String topic) {
-        this.subscribeTable.remove(topic);
         try {
             this.rocketmqPushConsumer.unsubscribe(topic);
         } catch (Exception e) {
@@ -197,9 +194,7 @@ public class PushConsumerImpl {
                 cloudEvent = cloudEventBuilder.build();
             }
 
-            EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());
-
-            if (listener == null) {
+            if (eventListener == null) {
                 throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
                         msg.getTopic()));
             }
@@ -231,7 +226,7 @@ public class PushConsumerImpl {
 
             eventMeshAsyncConsumeContext.setAbstractContext(context);
 
-            listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+            eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
 
             return EventMeshConsumeConcurrentlyStatus.valueOf(
                     contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
@@ -270,9 +265,7 @@ public class PushConsumerImpl {
                 cloudEvent = cloudEventBuilder.build();
             }
 
-            EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());
-
-            if (listener == null) {
+            if (eventListener == null) {
                 throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
                         msg.getTopic()));
             }
@@ -306,12 +299,14 @@ public class PushConsumerImpl {
 
             eventMeshAsyncConsumeContext.setAbstractContext(context);
 
-            listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+            eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
 
             return EventMeshConsumeConcurrentlyStatus.valueOf(
                     contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
         }
     }
 
-
+    public void registerEventListener(EventListener listener) {
+        this.eventListener = listener;
+    }
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index 12fc214..eaf2a42 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -70,8 +70,8 @@ public class RocketMQConsumerImpl implements Consumer {
     }
 
     @Override
-    public void subscribe(String topic, EventListener listener) throws Exception {
-        pushConsumer.subscribe(topic, "*", listener);
+    public void subscribe(String topic) throws Exception {
+        pushConsumer.subscribe(topic, "*");
     }
 
     @Override
@@ -100,6 +100,11 @@ public class RocketMQConsumerImpl implements Consumer {
     }
 
     @Override
+    public void registerEventListener(EventListener listener) {
+        pushConsumer.registerEventListener(listener);
+    }
+
+    @Override
     public synchronized void shutdown() {
         pushConsumer.shutdown();
     }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 31bcd5f..683183b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -89,15 +89,7 @@ public class PushConsumerImplTest {
         consumedMsg.setBody(testBody);
         consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
         consumedMsg.setTopic("HELLO_QUEUE");
-        consumer.subscribe("HELLO_QUEUE", "*", new EventListener() {
-
-            @Override
-            public void consume(CloudEvent cloudEvent, org.apache.eventmesh.api.AsyncConsumeContext context) {
-                assertThat(cloudEvent.getExtension("MESSAGE_ID")).isEqualTo("NewMsgId");
-                assertThat(cloudEvent.getData()).isEqualTo(testBody);
-                context.commit(EventMeshAction.CommitMessage);
-            }
-        });
+        consumer.subscribe("HELLO_QUEUE", "*");
         ((MessageListenerConcurrently) rocketmqPushConsumer
                 .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
index 225b208..f56fc2b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java
@@ -37,6 +37,8 @@ public class StandaloneConsumer implements Consumer {
 
     private StandaloneBroker standaloneBroker;
 
+    private EventListener listener;
+
     private AtomicBoolean isStarted;
 
     private final ConcurrentHashMap<String, SubScribeTask> subscribeTaskTable;
@@ -90,10 +92,8 @@ public class StandaloneConsumer implements Consumer {
     }
 
     @Override
-    public void subscribe(String topic, EventListener listener) throws Exception {
-        if (listener == null) {
-            throw new IllegalArgumentException("listener cannot be null");
-        }
+    public void subscribe(String topic) throws Exception {
+
         if (subscribeTaskTable.containsKey(topic)) {
             return;
         }
@@ -116,4 +116,9 @@ public class StandaloneConsumer implements Consumer {
             subscribeTaskTable.remove(topic);
         }
     }
+
+    @Override
+    public void registerEventListener(EventListener listener) {
+        this.listener = listener;
+    }
 }
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
index c09d422..d7d7257 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
@@ -69,8 +69,8 @@ public class StandaloneConsumerAdaptor implements Consumer {
     }
 
     @Override
-    public void subscribe(String topic, EventListener listener) throws Exception {
-        consumer.subscribe(topic, listener);
+    public void subscribe(String topic) throws Exception {
+        consumer.subscribe(topic);
     }
 
     @Override
@@ -78,95 +78,8 @@ public class StandaloneConsumerAdaptor implements Consumer {
         consumer.unsubscribe(topic);
     }
 
-    //@Override
-    //public void init(Properties keyValue) throws Exception {
-    //    String producerGroup = keyValue.getProperty("producerGroup");
-    //
-    //    MessagingAccessPointImpl messagingAccessPoint = new MessagingAccessPointImpl(keyValue);
-    //    consumer = (StandaloneConsumer) messagingAccessPoint.createConsumer(keyValue);
-    //
-    //}
-    //
-    //@Override
-    //public void updateOffset(List<Message> msgs, AbstractContext context) {
-    //    for(Message message : msgs) {
-    //        consumer.updateOffset(message);
-    //    }
-    //}
-    //
-    //@Override
-    //public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
-    //    // todo: support subExpression
-    //    consumer.subscribe(topic, "*", listener);
-    //}
-    //
-    //@Override
-    //public void unsubscribe(String topic) {
-    //    consumer.unsubscribe(topic);
-    //}
-    //
-    //@Override
-    //public void subscribe(String topic, String subExpression, MessageListener listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public void subscribe(String topic, MessageSelector selector, MessageListener listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public void subscribe(String topic, String subExpression, AsyncMessageListener listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
-    //    throw new UnsupportedOperationException("not supported yet");
-    //}
-    //
-    //@Override
-    //public void updateCredential(Properties credentialProperties) {
-    //
-    //}
-    //
-    //@Override
-    //public boolean isStarted() {
-    //    return consumer.isStarted();
-    //}
-    //
-    //@Override
-    //public boolean isClosed() {
-    //    return consumer.isClosed();
-    //}
-    //
-    //@Override
-    //public void start() {
-    //    consumer.start();
-    //}
-    //
-    //@Override
-    //public void shutdown() {
-    //    consumer.shutdown();
-    //}
+    @Override
+    public void registerEventListener(EventListener listener) {
+        consumer.registerEventListener(listener);
+    }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index edce54a..6bd8875 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -44,8 +44,8 @@ public class MQConsumerWrapper extends MQWrapper {
         }
     }
 
-    public void subscribe(String topic, EventListener listener) throws Exception {
-        meshMQPushConsumer.subscribe(topic, listener);
+    public void subscribe(String topic) throws Exception {
+        meshMQPushConsumer.subscribe(topic);
     }
 
     public void unsubscribe(String topic) throws Exception {
@@ -69,9 +69,9 @@ public class MQConsumerWrapper extends MQWrapper {
         started.compareAndSet(false, true);
     }
 
-    //public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) {
-    //    meshMQPushConsumer.registerMessageListener(messageListenerConcurrently);
-    //}
+    public void registerEventListener(EventListener listener) {
+        meshMQPushConsumer.registerEventListener(listener);
+    }
 
     public void updateOffset(List<CloudEvent> events, AbstractContext eventMeshConsumeConcurrentlyContext) {
         meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index 6bf6962..6324619 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -140,6 +140,8 @@ public class EventMeshConsumer {
         keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
             eventMeshGrpcConfiguration.eventMeshCluster));
         persistentMqConsumer.init(keyValue);
+        EventListener clusterEventListner = createEventListener(SubscriptionMode.CLUSTERING);
+        persistentMqConsumer.registerEventListener(clusterEventListner);
 
         Properties broadcastKeyValue = new Properties();
         broadcastKeyValue.put("isBroadcast", "true");
@@ -148,6 +150,8 @@ public class EventMeshConsumer {
         broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
             eventMeshGrpcConfiguration.eventMeshCluster));
         broadcastMqConsumer.init(broadcastKeyValue);
+        EventListener broadcastEventListner = createEventListener(SubscriptionMode.BROADCASTING);
+        broadcastMqConsumer.registerEventListener(broadcastEventListner);
 
         serviceState = ServiceState.INITED;
         logger.info("EventMeshConsumer [{}] initialized.............", consumerGroup);
@@ -184,9 +188,9 @@ public class EventMeshConsumer {
 
     public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
         if (SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
-            persistentMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
+            persistentMqConsumer.subscribe(topic);
         } else if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
-            broadcastMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
+            broadcastMqConsumer.subscribe(topic);
         } else {
             logger.error("Subscribe Failed. Incorrect Subscription Mode");
             throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 96398d4..b4e5386 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -92,7 +92,69 @@ public class EventMeshConsumer {
                 eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
         persistentMqConsumer.init(keyValue);
 
-        //
+        EventListener cluserEventListener = new EventListener() {
+            @Override
+            public void consume(CloudEvent event, AsyncConsumeContext context) {
+                String topic = event.getSubject();
+                //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+                String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
+                String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
+
+                event = CloudEventBuilder.from(event)
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+                    .build();
+                //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+                if (messageLogger.isDebugEnabled()) {
+                    messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
+                } else {
+                    messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
+                }
+
+                ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
+                    topic, null);
+                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+
+                if (currentTopicConfig == null) {
+                    logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
+                    try {
+                        sendMessageBack(event, uniqueId, bizSeqNo);
+                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                        //context.ack();
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                        return;
+                    } catch (Exception ex) {
+                        //ignore
+                    }
+                }
+
+                SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+                HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+                    consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+                    topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+                    consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+
+                if (httpMessageHandler.handle(handleMsgContext)) {
+                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                    // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                    //context.ack();
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+                } else {
+                    try {
+                        sendMessageBack(event, uniqueId, bizSeqNo);
+                    } catch (Exception e) {
+                        //ignore
+                    }
+                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                    // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                    //context.ack();
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                }
+            }
+        };
+        persistentMqConsumer.registerEventListener(cluserEventListener);
+
+        //broacast consumer
         Properties broadcastKeyValue = new Properties();
         broadcastKeyValue.put("isBroadcast", "true");
         broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
@@ -100,6 +162,72 @@ public class EventMeshConsumer {
         broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
                 eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
         broadcastMqConsumer.init(broadcastKeyValue);
+
+        EventListener broadcastEventListener = new EventListener() {
+            @Override
+            public void consume(CloudEvent event, AsyncConsumeContext context) {
+
+                event = CloudEventBuilder.from(event)
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()))
+                    .build();
+
+                String topic = event.getSubject();
+                String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString();
+                String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString();
+
+                if (messageLogger.isDebugEnabled()) {
+                    messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
+                } else {
+                    messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo,
+                        uniqueId);
+                }
+
+                ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
+                    consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+
+                if (currentTopicConfig == null) {
+                    logger.error("no topicConfig found, consumerGroup:{} topic:{}",
+                        consumerGroupConf.getConsumerGroup(), topic);
+                    try {
+                        sendMessageBack(event, uniqueId, bizSeqNo);
+                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                        //context.ack();
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                        return;
+                    } catch (Exception ex) {
+                        //ignore
+                    }
+                }
+
+                SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+                HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+                    consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+                    topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+                    consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+
+                if (httpMessageHandler.handle(handleMsgContext)) {
+                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                    // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                    //context.ack();
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+                } else {
+                    try {
+                        sendMessageBack(event, uniqueId, bizSeqNo);
+                    } catch (Exception e) {
+                        //ignore
+                    }
+                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                    // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                    //context.ack();
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                }
+            }
+        };
+        broadcastMqConsumer.registerEventListener(broadcastEventListener);
+
         inited4Persistent.compareAndSet(false, true);
         inited4Broadcast.compareAndSet(false, true);
         logger.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
@@ -113,130 +241,10 @@ public class EventMeshConsumer {
     }
 
     public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
-        EventListener listener = null;
         if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
-            listener = new EventListener() {
-                @Override
-                public void consume(CloudEvent event, AsyncConsumeContext context) {
-                    String topic = event.getSubject();
-                    //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-                    String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
-                    String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
-
-                    event = CloudEventBuilder.from(event)
-                            .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
-                            .build();
-                    //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-                    if (messageLogger.isDebugEnabled()) {
-                        messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
-                    } else {
-                        messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
-                    }
-
-                    ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
-                            topic, null);
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
-
-                    if (currentTopicConfig == null) {
-                        logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
-                        try {
-                            sendMessageBack(event, uniqueId, bizSeqNo);
-                            //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                            // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                            //context.ack();
-                            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                            return;
-                        } catch (Exception ex) {
-                            //ignore
-                        }
-                    }
-                    HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
-                            consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
-                            consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
-
-                    if (httpMessageHandler.handle(handleMsgContext)) {
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-                        //context.ack();
-                        eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
-                    } else {
-                        try {
-                            sendMessageBack(event, uniqueId, bizSeqNo);
-                        } catch (Exception e) {
-                            //ignore
-                        }
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                        //context.ack();
-                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                    }
-                }
-            };
-            persistentMqConsumer.subscribe(topic, listener);
+            persistentMqConsumer.subscribe(topic);
         } else {
-            listener = new EventListener() {
-                @Override
-                public void consume(CloudEvent event, AsyncConsumeContext context) {
-
-                    event = CloudEventBuilder.from(event)
-                            .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                                    String.valueOf(System.currentTimeMillis()))
-                            .build();
-
-                    String topic = event.getSubject();
-                    String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString();
-                    String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString();
-
-                    if (messageLogger.isDebugEnabled()) {
-                        messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
-                    } else {
-                        messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo,
-                                uniqueId);
-                    }
-
-                    ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
-                            consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
-
-                    if (currentTopicConfig == null) {
-                        logger.error("no topicConfig found, consumerGroup:{} topic:{}",
-                                consumerGroupConf.getConsumerGroup(), topic);
-                        try {
-                            sendMessageBack(event, uniqueId, bizSeqNo);
-                            //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                            // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                            //context.ack();
-                            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                            return;
-                        } catch (Exception ex) {
-                            //ignore
-                        }
-                    }
-                    HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
-                            consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
-                            consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
-
-                    if (httpMessageHandler.handle(handleMsgContext)) {
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-                        //context.ack();
-                        eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
-                    } else {
-                        try {
-                            sendMessageBack(event, uniqueId, bizSeqNo);
-                        } catch (Exception e) {
-                            //ignore
-                        }
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                        //context.ack();
-                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                    }
-                }
-            };
-            broadcastMqConsumer.subscribe(topic, listener);
+            broadcastMqConsumer.subscribe(topic);
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 7d45219..e6937f1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,7 +17,6 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
 
-import org.apache.eventmesh.api.AsyncConsumeContext;
 import org.apache.eventmesh.api.EventListener;
 import org.apache.eventmesh.api.EventMeshAction;
 import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
@@ -104,6 +103,8 @@ public class ClientGroupWrapper {
     private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
         new ConcurrentHashMap<String, Set<Session>>();
 
+    private ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
+
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
     private MQProducerWrapper mqProducerWrapper;
@@ -180,7 +181,12 @@ public class ClientGroupWrapper {
         return mqProducerWrapper;
     }
 
-    public boolean addSubscription(String topic, Session session) throws Exception {
+    public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) throws Exception {
+        if (subscriptionItem == null) {
+            logger.error("addSubscription param error,subscriptionItem is null", session);
+            return false;
+        }
+        String topic = subscriptionItem.getTopic();
         if (session == null || !StringUtils.equalsIgnoreCase(group,
             EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("addSubscription param error,topic:{},session:{}", topic, session);
@@ -203,6 +209,8 @@ public class ClientGroupWrapper {
                     .warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
                         session.getClient());
             }
+
+            subscriptions.putIfAbsent(topic, subscriptionItem);
         } catch (Exception e) {
             logger
                 .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
@@ -213,7 +221,12 @@ public class ClientGroupWrapper {
         return r;
     }
 
-    public boolean removeSubscription(String topic, Session session) {
+    public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
+        if (subscriptionItem == null) {
+            logger.error("addSubscription param error,subscriptionItem is null", session);
+            return false;
+        }
+        String topic = subscriptionItem.getTopic();
         if (session == null
             || !StringUtils.equalsIgnoreCase(group,
             EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
@@ -238,6 +251,7 @@ public class ClientGroupWrapper {
             }
             if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
                 topic2sessionInGroupMapping.remove(topic);
+                subscriptions.remove(topic);
                 logger.info("removeSubscription remove topic success, group:{} topic:{}",
                     group, topic);
             }
@@ -401,6 +415,79 @@ public class ClientGroupWrapper {
 
         persistentMsgConsumer.init(keyValue);
 
+        EventListener listener = (event, context) -> {
+            eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+                .incrementAndGet();
+            event = CloudEventBuilder.from(event)
+                .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                    String.valueOf(System.currentTimeMillis()))
+                .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                    eventMeshTCPConfiguration.eventMeshServerIp).build();
+            String topic = event.getSubject();
+
+            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+                (EventMeshAsyncConsumeContext) context;
+            Session session = downstreamDispatchStrategy
+                .select(group, topic, groupConsumerSessions);
+            String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
+            if (session == null) {
+                try {
+                    Integer sendBackTimes = 0;
+                    String sendBackFromEventMeshIp = "";
+                    if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
+                        EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
+                        sendBackTimes = (Integer) event.getExtension(
+                            EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
+                    }
+                    if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
+                        EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
+                        sendBackFromEventMeshIp = (String) event.getExtension(
+                            EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+                    }
+
+                    logger.error(
+                        "found no session to downstream msg,groupName:{}, topic:{}, "
+                            + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+                        group, topic, bizSeqNo, sendBackTimes,
+                        sendBackFromEventMeshIp);
+
+                    if (sendBackTimes >= eventMeshTCPServer
+                        .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+                        logger.error(
+                            "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+                                + "bizSeqNo:{}", eventMeshTCPServer
+                                .getEventMeshTCPConfiguration()
+                                .eventMeshTcpSendBackMaxTimes,
+                            group, topic, bizSeqNo);
+                    } else {
+                        sendBackTimes++;
+                        event = CloudEventBuilder.from(event)
+                            .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+                                sendBackTimes.toString())
+                            .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+                                eventMeshTCPConfiguration.eventMeshServerIp).build();
+                        sendMsgBackToBroker(event, bizSeqNo);
+                    }
+                } catch (Exception e) {
+                    logger.warn("handle msg exception when no session found", e);
+                }
+
+                eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                return;
+            }
+
+            SubscriptionItem subscriptionItem = subscriptions.get(topic);
+            DownStreamMsgContext downStreamMsgContext =
+                new DownStreamMsgContext(event, session, persistentMsgConsumer,
+                    eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                    subscriptionItem);
+            //msg put in eventmesh,waiting client ack
+            session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+            session.downstreamMsg(downStreamMsgContext);
+            eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+        };
+        persistentMsgConsumer.registerEventListener(listener);
+
         inited4Persistent.compareAndSet(false, true);
         logger.info("init persistentMsgConsumer success, group:{}", group);
     }
@@ -427,6 +514,66 @@ public class ClientGroupWrapper {
             .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
         broadCastMsgConsumer.init(keyValue);
 
+        EventListener listener = (event, context) -> {
+
+            eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+                .incrementAndGet();
+            event = CloudEventBuilder.from(event)
+                .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                    String.valueOf(System.currentTimeMillis()))
+                .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                    eventMeshTCPConfiguration.eventMeshServerIp).build();
+            String topic = event.getSubject();
+            //    message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+            //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+            //    String.valueOf(System.currentTimeMillis()));
+            //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+            //    eventMeshTCPConfiguration.eventMeshServerIp);
+
+            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+                (EventMeshAsyncConsumeContext) context;
+            if (CollectionUtils.isEmpty(groupConsumerSessions)) {
+                logger.warn("found no session to downstream broadcast msg");
+                eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                return;
+            }
+
+            Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
+
+            SubscriptionItem subscriptionItem = subscriptions.get(topic);
+            DownStreamMsgContext downStreamMsgContext =
+                new DownStreamMsgContext(event, null, broadCastMsgConsumer,
+                    eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                    subscriptionItem);
+
+            while (sessionsItr.hasNext()) {
+                Session session = sessionsItr.next();
+
+                if (!session.isAvailable(topic)) {
+                    logger
+                        .warn("downstream broadcast msg,session is not available,client:{}",
+                            session.getClient());
+                    continue;
+                }
+
+                downStreamMsgContext.session = session;
+
+                //downstream broadcast msg asynchronously
+                eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+                    .submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            //msg put in eventmesh,waiting client ack
+                            session.getPusher()
+                                .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+                            session.downstreamMsg(downStreamMsgContext);
+                        }
+                    });
+            }
+            eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+        };
+        broadCastMsgConsumer.registerEventListener(listener);
+
         inited4Broadcast.compareAndSet(false, true);
         logger.info("init broadCastMsgConsumer success, group:{}", group);
     }
@@ -441,139 +588,10 @@ public class ClientGroupWrapper {
     }
 
     public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
-        EventListener listener = null;
         if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
-            listener = (event, context) -> {
-
-                eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
-                    .incrementAndGet();
-                event = CloudEventBuilder.from(event)
-                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                        String.valueOf(System.currentTimeMillis()))
-                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                        eventMeshTCPConfiguration.eventMeshServerIp).build();
-                String topic = event.getSubject();
-                //    message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-                //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                //    String.valueOf(System.currentTimeMillis()));
-                //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                //    eventMeshTCPConfiguration.eventMeshServerIp);
-
-                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                    (EventMeshAsyncConsumeContext) context;
-                if (CollectionUtils.isEmpty(groupConsumerSessions)) {
-                    logger.warn("found no session to downstream broadcast msg");
-                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                    return;
-                }
-
-                Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
-
-                DownStreamMsgContext downStreamMsgContext =
-                    new DownStreamMsgContext(event, null, broadCastMsgConsumer,
-                        eventMeshAsyncConsumeContext.getAbstractContext(), false,
-                        subscriptionItem);
-
-                while (sessionsItr.hasNext()) {
-                    Session session = sessionsItr.next();
-
-                    if (!session.isAvailable(topic)) {
-                        logger
-                            .warn("downstream broadcast msg,session is not available,client:{}",
-                                session.getClient());
-                        continue;
-                    }
-
-                    downStreamMsgContext.session = session;
-
-                    //downstream broadcast msg asynchronously
-                    eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
-                        .submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                //msg put in eventmesh,waiting client ack
-                                session.getPusher()
-                                    .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
-                                session.downstreamMsg(downStreamMsgContext);
-                            }
-                        });
-                }
-
-                eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
-            };
-            broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
+            broadCastMsgConsumer.subscribe(subscriptionItem.getTopic());
         } else {
-            listener = (event, context) -> {
-                eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
-                    .incrementAndGet();
-                event = CloudEventBuilder.from(event)
-                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                        String.valueOf(System.currentTimeMillis()))
-                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                        eventMeshTCPConfiguration.eventMeshServerIp).build();
-                String topic = event.getSubject();
-
-                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                    (EventMeshAsyncConsumeContext) context;
-                Session session = downstreamDispatchStrategy
-                    .select(group, topic, groupConsumerSessions);
-                String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
-                if (session == null) {
-                    try {
-                        Integer sendBackTimes = 0;
-                        String sendBackFromEventMeshIp = "";
-                        if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
-                            EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
-                            sendBackTimes = (Integer) event.getExtension(
-                                EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
-                        }
-                        if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
-                            EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
-                            sendBackFromEventMeshIp = (String) event.getExtension(
-                                EventMeshConstants.EVENTMESH_SEND_BACK_IP);
-                        }
-
-                        logger.error(
-                            "found no session to downstream msg,groupName:{}, topic:{}, "
-                                + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
-                            group, topic, bizSeqNo, sendBackTimes,
-                            sendBackFromEventMeshIp);
-
-                        if (sendBackTimes >= eventMeshTCPServer
-                            .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
-                            logger.error(
-                                "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
-                                    + "bizSeqNo:{}", eventMeshTCPServer
-                                    .getEventMeshTCPConfiguration()
-                                    .eventMeshTcpSendBackMaxTimes,
-                                group, topic, bizSeqNo);
-                        } else {
-                            sendBackTimes++;
-                            event = CloudEventBuilder.from(event)
-                                .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
-                                    sendBackTimes.toString())
-                                .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
-                                    eventMeshTCPConfiguration.eventMeshServerIp).build();
-                            sendMsgBackToBroker(event, bizSeqNo);
-                        }
-                    } catch (Exception e) {
-                        logger.warn("handle msg exception when no session found", e);
-                    }
-
-                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                    return;
-                }
-
-                DownStreamMsgContext downStreamMsgContext =
-                    new DownStreamMsgContext(event, session, persistentMsgConsumer,
-                        eventMeshAsyncConsumeContext.getAbstractContext(), false,
-                        subscriptionItem);
-                //msg put in eventmesh,waiting client ack
-                session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
-                session.downstreamMsg(downStreamMsgContext);
-                eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
-            };
-            persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
+            persistentMsgConsumer.subscribe(subscriptionItem.getTopic());
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 003186e..2ba749f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -283,7 +283,7 @@ public class ClientSessionGroupMapping {
      */
     private void cleanSubscriptionInSession(Session session) throws Exception {
         for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
-            session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session);
+            session.getClientGroupWrapper().get().removeSubscription(item, session);
             if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
                 session.getClientGroupWrapper().get().unsubscribe(item);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 37e7b5e..09b7396 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -171,7 +171,7 @@ public class Session {
 
             clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
 
-            clientGroupWrapper.get().addSubscription(item.getTopic(), this);
+            clientGroupWrapper.get().addSubscription(item, this);
             subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
         }
     }
@@ -179,7 +179,7 @@ public class Session {
     public void unsubscribe(List<SubscriptionItem> items) throws Exception {
         for (SubscriptionItem item : items) {
             sessionContext.subscribeTopics.remove(item.getTopic());
-            clientGroupWrapper.get().removeSubscription(item.getTopic(), this);
+            clientGroupWrapper.get().removeSubscription(item, this);
 
             if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) {
                 clientGroupWrapper.get().unsubscribe(item);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org