You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/21 12:37:18 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new d39ea29  [ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)
d39ea29 is described below

commit d39ea29317eaa9a8d6201fff1ff26f6da5593151
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Mon Jun 21 20:37:11 2021 +0800

    [ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)
    
    * modify:optimize flow control in downstreaming msg
    
    * modify:optimize stategy of selecting session in downstream msg
    
    * modify:optimize msg downstream,msg store in session
    
    * modify:fix bug:not a @Sharable handler
    
    * modify:downstream broadcast msg asynchronously
    
    * modify:remove unneccessary interface in eventmesh-connector-api
    
    * modify:fix conflict
    
    * modify:add license in EventMeshAction
    close #391
---
 ...yncConsumeContext.java => EventMeshAction.java} | 15 ++-----
 ...text.java => EventMeshAsyncConsumeContext.java} | 24 +++++++----
 .../eventmesh/api/consumer/MeshMQPushConsumer.java |  5 ---
 .../eventmesh/api/producer/MeshMQProducer.java     |  9 +---
 .../rocketmq/consumer/PushConsumerImpl.java        | 49 +++++++++++++++-------
 .../rocketmq/producer/RocketMQProducerImpl.java    | 17 +-------
 .../rocketmq/consumer/PushConsumerImplTest.java    |  4 +-
 .../runtime/core/plugin/MQConsumerWrapper.java     |  8 ----
 .../protocol/http/consumer/EventMeshConsumer.java  | 22 +++++-----
 .../http/processor/SendSyncMessageProcessor.java   |  2 +-
 .../tcp/client/group/ClientGroupWrapper.java       | 18 ++++----
 .../core/protocol/tcp/client/session/Session.java  |  3 +-
 .../tcp/client/task/MessageTransferTask.java       |  5 ---
 13 files changed, 84 insertions(+), 97 deletions(-)

diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
similarity index 71%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
copy to eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
index d6b2aa4..4fda6d0 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
@@ -14,19 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.eventmesh.api;
 
-import io.openmessaging.api.AsyncConsumeContext;
-
-public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
-    private AbstractContext context;
+public enum EventMeshAction {
+    CommitMessage,
 
-    public AbstractContext getContext() {
-        return context;
-    }
+    ReconsumeLater,
 
-    public void setContext(AbstractContext context) {
-        this.context = context;
-    }
+    ManualAck
 }
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
similarity index 62%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
rename to eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
index d6b2aa4..c7e4e7f 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
@@ -14,19 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.eventmesh.api;
 
+import io.openmessaging.api.Action;
 import io.openmessaging.api.AsyncConsumeContext;
 
-public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
-    private AbstractContext context;
+public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {
+
+    private AbstractContext abstractContext;
 
-    public AbstractContext getContext() {
-        return context;
+    public AbstractContext getAbstractContext() {
+        return abstractContext;
     }
 
-    public void setContext(AbstractContext context) {
-        this.context = context;
+    public void setAbstractContext(AbstractContext abstractContext) {
+        this.abstractContext = abstractContext;
+    }
+
+    public abstract void commit(EventMeshAction action);
+
+    @Override
+    public void commit(Action action) {
+        throw new UnsupportedOperationException("not support yet");
     }
-}
+}
\ No newline at end of file
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 172a674..5e60e0e 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -30,11 +30,6 @@ public interface MeshMQPushConsumer extends Consumer {
 
     void init(Properties keyValue) throws Exception;
 
-    @Override
-    void start();
-
-//    void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
-
     void updateOffset(List<Message> msgs, AbstractContext context);
 
 //    void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 929654f..82ca583 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -29,9 +29,6 @@ public interface MeshMQProducer extends Producer {
 
     void init(Properties properties) throws Exception;
 
-    @Override
-    void start();
-
     void send(Message message, SendCallback sendCallback) throws Exception;
 
     void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
@@ -40,12 +37,8 @@ public interface MeshMQProducer extends Producer {
 
     boolean reply(final Message message, final SendCallback sendCallback) throws Exception;
 
-    MeshMQProducer getMeshMQProducer();
-
-    String buildMQClientId();
+    void checkTopicExist(String topic) throws Exception;
 
     void setExtFields();
 
-    void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception;
-
 }
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index bbdbd86..47565e3 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -21,9 +21,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.openmessaging.api.Action;
-import io.openmessaging.api.AsyncConsumeContext;
 import io.openmessaging.api.AsyncGenericMessageListener;
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Consumer;
@@ -32,9 +29,8 @@ import io.openmessaging.api.Message;
 import io.openmessaging.api.MessageListener;
 import io.openmessaging.api.MessageSelector;
 import io.openmessaging.api.exception.OMSRuntimeException;
-
-import org.apache.eventmesh.api.AbstractContext;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
 import org.apache.eventmesh.connector.rocketmq.config.ClientConfig;
@@ -116,13 +112,25 @@ public class PushConsumerImpl implements Consumer {
 
                     final Properties contextProperties = new Properties();
                     contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
-                    MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
+                    EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
                         @Override
-                        public void commit(Action action) {
-                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                        public void commit(EventMeshAction action) {
+                            switch (action){
+                                case CommitMessage:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                                    break;
+                                case ReconsumeLater:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                                    break;
+                                case ManualAck:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                                    break;
+                                default:
+                                    break;
+                            }
                         }
                     };
-                    omsContext.setContext(context);
+                    omsContext.setAbstractContext(context);
                     listener.consume(omsMsg, omsContext);
 
                     return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
@@ -156,14 +164,25 @@ public class PushConsumerImpl implements Consumer {
 
                     contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
 
-                    MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
+                    EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
                         @Override
-                        public void commit(Action action) {
-                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                        public void commit(EventMeshAction action) {
+                            switch (action) {
+                                case CommitMessage:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                                    break;
+                                case ReconsumeLater:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                                    break;
+                                case ManualAck:
+                                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                                    break;
+                                default:
+                                    break;
+                            }
                         }
                     };
-                    omsContext.setContext(context);
+                    omsContext.setAbstractContext(context);
                     listener.consume(omsMsg, omsContext);
 
                     return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
index 6364650..72e2093 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
@@ -87,7 +87,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {
         producer.start();
     }
 
-
     @Override
     public synchronized void shutdown() {
         producer.shutdown();
@@ -115,13 +114,8 @@ public class RocketMQProducerImpl implements MeshMQProducer {
     }
 
     @Override
-    public MeshMQProducer getMeshMQProducer() {
-        return this;
-    }
-
-    @Override
-    public String buildMQClientId() {
-        return producer.getRocketmqProducer().buildMQClientId();
+    public void checkTopicExist(String topic) throws Exception {
+        this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
     }
 
     @Override
@@ -130,13 +124,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {
     }
 
     @Override
-    public void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception {
-        producer.getRocketmqProducer().getDefaultMQProducerImpl()
-                .getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic,
-                timeout);
-    }
-
-    @Override
     public SendResult send(Message message) {
         return producer.send(message);
     }
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 186ef4a..1140535 100644
--- a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -32,6 +32,8 @@ import io.openmessaging.api.MessagingAccessPoint;
 import io.openmessaging.api.OMS;
 import io.openmessaging.api.OMSBuiltinKeys;
 
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
 import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -95,7 +97,7 @@ public class PushConsumerImplTest {
             public void consume(Message message, AsyncConsumeContext context) {
                 assertThat(message.getSystemProperties("MESSAGE_ID")).isEqualTo("NewMsgId");
                 assertThat(message.getBody()).isEqualTo(testBody);
-                context.commit(Action.CommitMessage);
+                ((EventMeshAsyncConsumeContext)context).commit(EventMeshAction.CommitMessage);
             }
         });
         ((MessageListenerConcurrently) rocketmqPushConsumer
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 39b2c63..080b7af 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -43,14 +43,6 @@ public class MQConsumerWrapper extends MQWrapper {
         meshMQPushConsumer.unsubscribe(topic);
     }
 
-//    public boolean isPause() {
-//        return meshMQPushConsumer.isPause();
-//    }
-//
-//    public void pause() {
-//        meshMQPushConsumer.pause();
-//    }
-
     public synchronized void init(Properties keyValue) throws Exception {
         meshMQPushConsumer = getMeshMQPushConsumer();
         if (meshMQPushConsumer == null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8a1049b..8620e68 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import io.openmessaging.api.Action;
 import io.openmessaging.api.AsyncConsumeContext;
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Message;
@@ -31,7 +30,8 @@ import io.openmessaging.api.SendResult;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.eventmesh.api.AbstractContext;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -125,6 +125,7 @@ public class EventMeshConsumer {
                     }
 
                     ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
 
                     if (currentTopicConfig == null) {
                         logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
@@ -132,18 +133,18 @@ public class EventMeshConsumer {
                             sendMessageBack(message, uniqueId, bizSeqNo);
 //                            context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                            context.ack();
-                            context.commit(Action.CommitMessage);
+                            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                             return;
                         } catch (Exception ex) {
                         }
                     }
                     HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                            topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                     if (httpMessageHandler.handle(handleMsgContext)) {
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                     } else {
                         try {
                             sendMessageBack(message, uniqueId, bizSeqNo);
@@ -152,7 +153,7 @@ public class EventMeshConsumer {
                         }
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                     }
                 }
             };
@@ -174,6 +175,7 @@ public class EventMeshConsumer {
                     }
 
                     ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
 
                     if (currentTopicConfig == null) {
                         logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
@@ -181,18 +183,18 @@ public class EventMeshConsumer {
                             sendMessageBack(message, uniqueId, bizSeqNo);
 //                            context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                            context.ack();
-                            context.commit(Action.CommitMessage);
+                            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                             return;
                         } catch (Exception ex) {
                         }
                     }
                     HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                            topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                     if (httpMessageHandler.handle(handleMsgContext)) {
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                     } else {
                         try {
                             sendMessageBack(message, uniqueId, bizSeqNo);
@@ -201,7 +203,7 @@ public class EventMeshConsumer {
                         }
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                     }
                 }
             };
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5ee12d3..5511e38 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -138,7 +138,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             omsMsg.putUserProperties("msgType", "persistent");
             omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
             omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
-            omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
+//            omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
 
             if (messageLogger.isDebugEnabled()) {
                 messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index a297935..bbd62c0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.alibaba.fastjson.JSON;
 
-import io.openmessaging.api.Action;
 import io.openmessaging.api.AsyncConsumeContext;
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Message;
@@ -40,7 +39,8 @@ import io.openmessaging.api.SendResult;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.api.RRCallback;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -536,18 +536,19 @@ public class ClientGroupWrapper {
                     message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                     message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
 
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
                     if (CollectionUtils.isEmpty(groupConsumerSessions)) {
                         logger.warn("found no session to downstream broadcast msg");
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     }
 
                     Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
+                            new DownStreamMsgContext(message, null, broadCastMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
 
                     while (sessionsItr.hasNext()) {
                         Session session = sessionsItr.next();
@@ -572,7 +573,7 @@ public class ClientGroupWrapper {
 
 //                    context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
 //                    context.ack();
-                    context.commit(Action.CommitMessage);
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 }
             };
             broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
@@ -585,6 +586,7 @@ public class ClientGroupWrapper {
                     message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                     message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
 
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
                     Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
                     String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
                     if (session == null) {
@@ -614,18 +616,18 @@ public class ClientGroupWrapper {
 
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
 //                        context.ack();
-                        context.commit(Action.CommitMessage);
+                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     }
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
+                            new DownStreamMsgContext(message, session, persistentMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
                     //msg put in eventmesh,waiting client ack
                     session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                     session.downstreamMsg(downStreamMsgContext);
 //                    context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
 //                    context.ack();
-                    context.commit(Action.CommitMessage);
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 }
             };
             persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 8f4e72a..280ff3b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -166,8 +166,7 @@ public class Session {
             sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
             clientGroupWrapper.get().subscribe(item);
 
-            clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(item.getTopic(),
-                    EventMeshConstants.DEFAULT_TIME_OUT_MILLS);
+            clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
 
             clientGroupWrapper.get().addSubscription(item.getTopic(), this);
             subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 2d4b80d..028214d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -84,11 +84,6 @@ public class MessageTransferTask extends AbstractTask {
             synchronized (session) {
                 long sendTime = System.currentTimeMillis();
                 addTimestamp(eventMeshMessage, cmd, sendTime);
-                if (cmd.equals(Command.REQUEST_TO_SERVER)) {
-                    //Message Attach SYNC
-                    eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
-                            .get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
-                }
 
                 sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org