You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/17 07:09:31 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #366 ] remove custom-format topic concept (#388)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ffe74e  [ISSUE #366 ] remove custom-format topic concept (#388)
0ffe74e is described below

commit 0ffe74e660368db1cab6d9d9c55cca206c29384f
Author: nanoxiong <xi...@163.com>
AuthorDate: Thu Jun 17 15:08:26 2021 +0800

    [ISSUE #366 ] remove custom-format topic concept (#388)
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
    
    * remove custom-format topic concept
---
 .../java/org/apache/eventmesh/common/IPUtil.java   |  2 +-
 .../eventmesh/common/protocol/SubcriptionType.java | 26 +++++++-
 .../Subscription.java => SubscriptionItem.java}    | 51 ++++++++++-----
 .../common/protocol/SubscriptionMode.java          | 28 ++++++++-
 .../http/body/client/HeartbeatRequestBody.java     |  1 -
 .../protocol/http/body/client/RegRequestBody.java  |  9 +--
 .../http/body/client/SubscribeRequestBody.java     | 15 +++--
 .../common/protocol/tcp/Subscription.java          | 12 ++--
 .../runtime/boot/EventMeshHTTPServer.java          |  4 +-
 .../eventmesh/runtime/boot/EventMeshServer.java    |  1 -
 .../runtime/constants/DeFiBusConstant.java         | 72 ----------------------
 .../core/consumergroup/ConsumerGroupTopicConf.java | 18 +++++-
 .../http/consumer/ConsumerGroupManager.java        |  6 +-
 .../protocol/http/consumer/ConsumerManager.java    |  1 +
 .../protocol/http/consumer/EventMeshConsumer.java  | 18 +++---
 .../protocol/http/consumer/HandleMsgContext.java   | 18 +++++-
 .../http/processor/SubscribeProcessor.java         | 23 ++++---
 .../http/processor/UnSubscribeProcessor.java       |  1 +
 .../protocol/http/push/AsyncHTTPPushRequest.java   |  3 +-
 .../tcp/client/group/ClientGroupWrapper.java       | 22 ++++---
 .../client/group/ClientSessionGroupMapping.java    | 12 ++--
 .../core/protocol/tcp/client/session/Session.java  | 33 +++++-----
 .../tcp/client/session/SessionContext.java         |  5 +-
 .../client/session/push/DownStreamMsgContext.java  |  8 ++-
 .../tcp/client/session/push/SessionPusher.java     | 13 ++--
 .../session/push/retry/EventMeshTcpRetryer.java    |  7 ++-
 .../tcp/client/task/MessageTransferTask.java       |  1 +
 .../protocol/tcp/client/task/SubscribeTask.java    | 21 +++----
 .../protocol/tcp/client/task/UnSubscribeTask.java  |  8 ++-
 .../eventmesh/runtime/util/EventMeshUtil.java      | 58 -----------------
 .../src/test/java/client/EventMeshClient.java      | 10 ++-
 .../src/test/java/client/SubClient.java            | 10 ++-
 .../src/test/java/client/common/MessageUtils.java  | 38 ++++++------
 .../test/java/client/impl/EventMeshClientImpl.java | 17 ++---
 .../src/test/java/client/impl/SubClientImpl.java   | 37 +++++------
 .../src/test/java/demo/AsyncSubClient.java         |  4 +-
 .../src/test/java/demo/BroadCastSubClient.java     |  4 +-
 .../src/test/java/demo/CCSubClient.java            |  4 +-
 .../src/test/java/demo/CClientDemo.java            |  6 +-
 .../src/test/java/demo/SyncSubClient.java          |  4 +-
 .../client/http/consumer/LiteConsumer.java         | 29 ++++++---
 .../eventmesh/client/tcp/EventMeshClient.java      |  4 +-
 .../eventmesh/client/tcp/SimpleSubClient.java      |  4 +-
 .../eventmesh/client/tcp/common/MessageUtils.java  | 19 +++---
 .../client/tcp/impl/DefaultEventMeshClient.java    |  6 +-
 .../client/tcp/impl/SimpleSubClientImpl.java       | 22 ++++---
 .../eventmesh/client/tcp/demo/AsyncSubscribe.java  |  4 +-
 .../client/tcp/demo/AsyncSubscribeBroadcast.java   |  4 +-
 .../eventmesh/client/tcp/demo/SyncResponse.java    |  4 +-
 .../http/demo/sub/service/SubService.java          | 12 +++-
 .../apache/eventmesh/tcp/demo/AsyncSubscribe.java  |  4 +-
 .../tcp/demo/AsyncSubscribeBroadcast.java          |  4 +-
 .../apache/eventmesh/tcp/demo/SyncResponse.java    |  4 +-
 53 files changed, 382 insertions(+), 369 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
index 4cd19e4..5165deb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
@@ -35,7 +35,7 @@ public class IPUtil {
     public static String getLocalAddress() {
         // if the progress works under docker environment
         // return the host ip about this docker located from environment value
-        String dockerHostIp = System.getenv("webank_docker_host_ip");
+        String dockerHostIp = System.getenv("docker_host_ip");
         if (dockerHostIp != null && !"".equals(dockerHostIp))
             return dockerHostIp;
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
similarity index 67%
copy from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
index f06ba02..6b6cb16 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
@@ -15,7 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.consumergroup.event;
+package org.apache.eventmesh.common.protocol;
 
-public class ConsumerGroupInstanceChangeEvent {
+public enum SubcriptionType {
+    /**
+     * SYNC
+     */
+    SYNC("SYNC"),
+    /**
+     * ASYNC
+     */
+    ASYNC("ASYNC");
+
+    private String type;
+
+    SubcriptionType(String type) {
+        this.type = type;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
similarity index 50%
copy from eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
index 83b0dc9..abcc22c 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
@@ -15,36 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.common.protocol.tcp;
+package org.apache.eventmesh.common.protocol;
 
-import java.util.LinkedList;
-import java.util.List;
+public class SubscriptionItem {
 
-public class Subscription {
+    private String topic;
 
-    private List<String> topicList = new LinkedList<>();
+    private SubscriptionMode mode;
 
-    public Subscription() {
+    private SubcriptionType type;
+
+    public SubscriptionItem() {
+    }
+
+    public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
+        this.topic = topic;
+        this.mode = mode;
+        this.type = type;
     }
 
-    public Subscription(List<String> topicList) {
-        this.topicList = topicList;
+    public SubcriptionType getType() {
+        return type;
     }
 
-    public List<String> getTopicList() {
-        return topicList;
+    public void setType(SubcriptionType type) {
+        this.type = type;
     }
 
-    public void setTopicList(List<String> topicList) {
-        this.topicList = topicList;
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public SubscriptionMode getMode() {
+        return mode;
+    }
+
+    public void setMode(SubscriptionMode mode) {
+        this.mode = mode;
     }
 
     @Override
     public String toString() {
-        return "Subscription{" +
-                "topicList=" + topicList +
+        return "SubscriptionItem{" +
+                "topic=" + topic +
+                ", mode=" + mode +
+                ", type=" + type +
                 '}';
     }
+}
 
 
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
similarity index 64%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
index f06ba02..ad4b751 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
@@ -15,7 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.consumergroup.event;
+package org.apache.eventmesh.common.protocol;
+
+public enum SubscriptionMode {
+
+    /**
+     * broadcast
+     */
+    BROADCASTING("BROADCASTING"),
+    /**
+     * clustering
+     */
+    CLUSTERING("CLUSTERING");
+
+    private String mode;
+
+    SubscriptionMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
 
-public class ConsumerGroupInstanceChangeEvent {
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
index 5c3d1ea..ace4d71 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
@@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body {
     public static final String CLIENTTYPE = "clientType";
     public static final String HEARTBEATENTITIES = "heartbeatEntities";
 
-
     private String clientType;
 
     private List<HeartbeatEntity> heartbeatEntities;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
index 41c5c5a..78bb684 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
@@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 
 import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.body.Body;
 
 public class RegRequestBody extends Body {
@@ -39,13 +40,13 @@ public class RegRequestBody extends Body {
 
     private String endPoint;
 
-    private List<String> topics;
+    private List<SubscriptionItem> topics;
 
-    public List<String> getTopics() {
+    public List<SubscriptionItem> getTopics() {
         return topics;
     }
 
-    public void setTopics(List<String> topics) {
+    public void setTopics(List<SubscriptionItem> topics) {
         this.topics = topics;
     }
 
@@ -69,7 +70,7 @@ public class RegRequestBody extends Body {
         RegRequestBody body = new RegRequestBody();
         body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
         body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class));
+        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
         return body;
     }
 
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
index 6a37cc5..991b834 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
@@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 
 import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.body.Body;
 
 public class SubscribeRequestBody extends Body {
@@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body {
 
     public static final String URL = "url";
 
-    private List<String> topics;
+    private List<SubscriptionItem> topics;
 
-    private String url;
-
-    private String topic;
-
-    public List<String> getTopics() {
+    public List<SubscriptionItem> getTopics() {
         return topics;
     }
 
-    public void setTopics(List<String> topics) {
+    public void setTopics(List<SubscriptionItem> topics) {
         this.topics = topics;
     }
 
+    private String url;
+
     public String getUrl() {
         return url;
     }
@@ -58,7 +57,7 @@ public class SubscribeRequestBody extends Body {
     public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
         SubscribeRequestBody body = new SubscribeRequestBody();
         body.setUrl(MapUtils.getString(bodyParam, URL));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
+        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
         return body;
     }
 
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
index 83b0dc9..3fb74b9 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
@@ -17,25 +17,27 @@
 
 package org.apache.eventmesh.common.protocol.tcp;
 
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+
 import java.util.LinkedList;
 import java.util.List;
 
 public class Subscription {
 
-    private List<String> topicList = new LinkedList<>();
+    private List<SubscriptionItem> topicList = new LinkedList<>();
 
     public Subscription() {
     }
 
-    public Subscription(List<String> topicList) {
+    public Subscription(List<SubscriptionItem> topicList) {
         this.topicList = topicList;
     }
 
-    public List<String> getTopicList() {
+    public List<SubscriptionItem> getTopicList() {
         return topicList;
     }
 
-    public void setTopicList(List<String> topicList) {
+    public void setTopicList(List<SubscriptionItem> topicList) {
         this.topicList = topicList;
     }
 
@@ -45,6 +47,4 @@ public class Subscription {
                 "topicList=" + topicList +
                 '}';
     }
-
-
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 80540c0..84a2ab7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer {
 
     private EventMeshHTTPConfiguration eventMeshHttpConfiguration;
 
-    public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
+    public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
 
-    public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
+    public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
 
     public EventMeshHTTPServer(EventMeshServer eventMeshServer,
                                EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 61bfb89..19d6953 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -54,7 +54,6 @@ public class EventMeshServer {
 
         String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));
         logger.info("eventstore : {}", eventstore);
-//        logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName());
 
         serviceState = ServiceState.INITED;
         logger.info("server state:{}", serviceState);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java
deleted file mode 100644
index 898854d..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.eventmesh.runtime.constants;
-
-//TODO
-public class DeFiBusConstant {
-    public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO";  //requester clientId
-
-    public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";
-
-    public static final String PROPERTY_MESSAGE_TTL = "TTL";    //timeout for request-response
-
-    public static final String PROPERTY_MESSAGE_CLUSTER = "CLUSTER";    //cluster name
-
-    public static final String PROPERTY_MESSAGE_BROKER = "BROKER";  //broker name where message stored
-
-    public static final String REDIRECT = "REDIRECT";
-
-    public static final String REDIRECT_FLAG = "REDIRECT_FLAG";
-
-    public static final String PLUGIN_CLASS_NAME = "org.apache.defibus.broker.plugin.DeFiPluginMessageStore";
-
-    public static final String RR_REPLY_TOPIC = "rr-reply-topic";   //post fix for reply topic
-
-    public static final String KEY = "msgType";
-
-    public static final String DEFAULT_TTL = "14400000";
-
-    public static final String EXT_CONSUMER_GROUP = "ExtConsumerGroup";
-
-    public static final String RMQ_SYS = "RMQ_SYS_";
-
-    /**
-     * msgType1: indicate the msg is broadcast message
-     */
-    public static final String DIRECT = "direct";
-
-    /**
-     * msgType2: msg of type except broadcast and reply
-     */
-    public static final String PERSISTENT = "persistent";
-
-    /**
-     * msgType3: indicate the msg is which consumer reply to producer
-     */
-    public static final String REPLY = "reply";
-
-    public static final String INSTANCE_NAME_SEPERATER = "#";
-
-    public static final String IDC_SEPERATER = "-";
-
-    public static final String LEAVE_TIME = "LEAVE_TIME";            //leaveBrokerTime
-    public static final String ARRIVE_TIME = "ARRIVE_TIME";
-    public static final String STORE_TIME = "STORE_TIME";
-
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
index 5a16c4c..f29d90a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +38,11 @@ public class ConsumerGroupTopicConf {
     private String topic;
 
     /**
+     * @see org.apache.eventmesh.common.protocol.SubscriptionItem
+     */
+    private SubscriptionItem subscriptionItem;
+
+    /**
      * PUSH URL
      */
     private Map<String /** IDC*/, List<String> /** IDC内URL列表*/> idcUrls = Maps.newConcurrentMap();
@@ -53,12 +59,13 @@ public class ConsumerGroupTopicConf {
         ConsumerGroupTopicConf that = (ConsumerGroupTopicConf) o;
         return consumerGroup.equals(that.consumerGroup) &&
                 Objects.equals(topic, that.topic) &&
+                Objects.equals(subscriptionItem, that.subscriptionItem) &&
                 Objects.equals(idcUrls, that.idcUrls);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(consumerGroup, topic, idcUrls);
+        return Objects.hash(consumerGroup, topic, subscriptionItem, idcUrls);
     }
 
     @Override
@@ -66,6 +73,7 @@ public class ConsumerGroupTopicConf {
         StringBuilder sb = new StringBuilder();
         sb.append("consumeTopicConfig={consumerGroup=").append(consumerGroup)
                 .append(",topic=").append(topic)
+                .append(",subscriptionMode=").append(subscriptionItem)
                 .append(",idcUrls=").append(idcUrls).append("}");
         return sb.toString();
     }
@@ -86,6 +94,14 @@ public class ConsumerGroupTopicConf {
         this.topic = topic;
     }
 
+    public SubscriptionItem getSubscriptionItem() {
+        return subscriptionItem;
+    }
+
+    public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+        this.subscriptionItem = subscriptionItem;
+    }
+
     public Map<String, List<String>> getIdcUrls() {
         return idcUrls;
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
index 76381e3..c73523b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
@@ -17,10 +17,12 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.consumer;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
 
 public class ConsumerGroupManager {
 
@@ -52,8 +54,8 @@ public class ConsumerGroupManager {
     }
 
     private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
-        for (String topic : consumerGroupConfig.getConsumerGroupTopicConf().keySet()) {
-            eventMeshConsumer.subscribe(topic);
+        for (Map.Entry<String, ConsumerGroupTopicConf> conf : consumerGroupConfig.getConsumerGroupTopicConf().entrySet()) {
+            eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionItem());
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
index 53e8a3c..14486bb 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
@@ -113,6 +113,7 @@ public class ConsumerManager {
                                                 ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                                                 latestTopicConf.setConsumerGroup(consumerGroup);
                                                 latestTopicConf.setTopic(topic);
+                                                latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
                                                 latestTopicConf.setUrls(clientUrls);
 
                                                 latestTopicConf.setIdcUrls(idcUrls);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 36dee3c..e39afb2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -33,6 +33,8 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.MeshAsyncConsumeContext;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -108,9 +110,9 @@ public class EventMeshConsumer {
         started4Broadcast.compareAndSet(false, true);
     }
 
-    public void subscribe(String topic) throws Exception {
+    public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
         AsyncMessageListener listener = null;
-        if (!EventMeshUtil.isBroadcast(topic)) {
+        if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
             listener = new AsyncMessageListener() {
                 @Override
                 public void consume(Message message, AsyncConsumeContext context) {
@@ -139,7 +141,7 @@ public class EventMeshConsumer {
                         }
                     }
                     HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                            topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                     if (httpMessageHandler.handle(handleMsgContext)) {
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
@@ -188,7 +190,7 @@ public class EventMeshConsumer {
                         }
                     }
                     HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                            topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                            topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                     if (httpMessageHandler.handle(handleMsgContext)) {
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
@@ -210,8 +212,8 @@ public class EventMeshConsumer {
         }
     }
 
-    public void unsubscribe(String topic) throws Exception {
-        if (EventMeshUtil.isBroadcast(topic)) {
+    public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
+        if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
             broadcastMqConsumer.unsubscribe(topic);
         } else {
             persistentMqConsumer.unsubscribe(topic);
@@ -234,8 +236,8 @@ public class EventMeshConsumer {
         started4Broadcast.compareAndSet(true, false);
     }
 
-    public void updateOffset(String topic, List<Message> msgs, AbstractContext context) {
-        if (EventMeshUtil.isBroadcast(topic)) {
+    public void updateOffset(String topic, SubscriptionMode subscriptionMode, List<Message> msgs, AbstractContext context) {
+        if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
             broadcastMqConsumer.updateOffset(msgs, context);
         } else {
             persistentMqConsumer.updateOffset(msgs, context);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index 01768ca..2d19174 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -50,6 +52,8 @@ public class HandleMsgContext {
 
     private String topic;
 
+    private SubscriptionItem subscriptionItem;
+
     private Message msg;
 
     private int ttl;
@@ -67,7 +71,7 @@ public class HandleMsgContext {
     private Map<String, String> props;
 
     public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsumer eventMeshConsumer,
-                            String topic, Message msg,
+                            String topic, Message msg, SubscriptionItem subscriptionItem,
                             AbstractContext context, ConsumerGroupConf consumerGroupConfig,
                             EventMeshHTTPServer eventMeshHTTPServer, String bizSeqNo, String uniqueId, ConsumerGroupTopicConf consumeTopicConfig) {
         this.msgRandomNo = msgRandomNo;
@@ -75,6 +79,7 @@ public class HandleMsgContext {
         this.eventMeshConsumer = eventMeshConsumer;
         this.topic = topic;
         this.msg = msg;
+        this.subscriptionItem = subscriptionItem;
         this.context = context;
         this.consumerGroupConfig = consumerGroupConfig;
         this.eventMeshHTTPServer = eventMeshHTTPServer;
@@ -152,6 +157,14 @@ public class HandleMsgContext {
         this.msg = msg;
     }
 
+    public SubscriptionItem getSubscriptionItem() {
+        return subscriptionItem;
+    }
+
+    public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+        this.subscriptionItem = subscriptionItem;
+    }
+
     public long getCreateTime() {
         return createTime;
     }
@@ -188,7 +201,7 @@ public class HandleMsgContext {
 //                        msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER),
 //                        msg.getQueueId(), msg.getQueueOffset());
             }
-            eventMeshConsumer.updateOffset(topic, Arrays.asList(msg), context);
+            eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Arrays.asList(msg), context);
         }
     }
 
@@ -214,6 +227,7 @@ public class HandleMsgContext {
         sb.append("handleMsgContext={")
                 .append("consumerGroup=").append(consumerGroup)
                 .append(",topic=").append(topic)
+                .append(",subscriptionItem=").append(subscriptionItem)
                 .append(",consumeTopicConfig=").append(consumeTopicConfig)
                 .append(",bizSeqNo=").append(bizSeqNo)
                 .append(",uniqueId=").append(uniqueId)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 1d0b6e8..2186944 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -30,6 +30,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
 import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
@@ -98,7 +99,7 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
-        List<String> subTopicList = subscribeRequestBody.getTopics();
+        List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
 
         String url = subscribeRequestBody.getUrl();
         String consumerGroup = EventMeshUtil.buildClientGroup(subscribeRequestHeader.getSys(),
@@ -108,8 +109,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
 
             registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
 
-            for (String subTopic : subTopicList) {
-                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic);
+            for (SubscriptionItem subTopic : subTopicList) {
+                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic.getTopic());
 
                 if (CollectionUtils.isEmpty(groupTopicClients)) {
                     httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
@@ -131,23 +132,25 @@ public class SubscribeProcessor implements HttpRequestProcessor {
                     consumerGroupConf = new ConsumerGroupConf(consumerGroup);
                     ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
                     consumeTopicConfig.setConsumerGroup(consumerGroup);
-                    consumeTopicConfig.setTopic(subTopic);
+                    consumeTopicConfig.setTopic(subTopic.getTopic());
+                    consumeTopicConfig.setSubscriptionItem(subTopic);
                     consumeTopicConfig.setUrls(new HashSet<>(Arrays.asList(url)));
 
                     consumeTopicConfig.setIdcUrls(idcUrls);
 
                     Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
-                    map.put(subTopic, consumeTopicConfig);
+                    map.put(subTopic.getTopic(), consumeTopicConfig);
                     consumerGroupConf.setConsumerGroupTopicConf(map);
                 } else {
                     // 已有订阅
                     Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
                     for (String key : map.keySet()) {
-                        if (StringUtils.equals(subTopic, key)) {
+                        if (StringUtils.equals(subTopic.getTopic(), key)) {
                             ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                             ConsumerGroupTopicConf currentTopicConf = map.get(key);
                             latestTopicConf.setConsumerGroup(consumerGroup);
-                            latestTopicConf.setTopic(subTopic);
+                            latestTopicConf.setTopic(subTopic.getTopic());
+                            latestTopicConf.setSubscriptionItem(subTopic);
                             latestTopicConf.setUrls(new HashSet<>(Arrays.asList(url)));
                             latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
 
@@ -206,8 +209,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
     }
 
     private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
-                                      List<String> topicList, String url) {
-        for(String topic: topicList) {
+                                      List<SubscriptionItem> subscriptionItems, String url) {
+        for(SubscriptionItem item: subscriptionItems) {
             Client client = new Client();
             client.env = subscribeRequestHeader.getEnv();
             client.dcn = subscribeRequestHeader.getDcn();
@@ -216,7 +219,7 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             client.ip = subscribeRequestHeader.getIp();
             client.pid = subscribeRequestHeader.getPid();
             client.consumerGroup = consumerGroup;
-            client.topic = topic;
+            client.topic = item.getTopic();
             client.url = url;
             client.lastUpTime = new Date();
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index 4d9f2e2..b723054 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -166,6 +166,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                                 ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                                 latestTopicConf.setConsumerGroup(consumerGroup);
                                 latestTopicConf.setTopic(unSubTopic);
+                                latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
                                 latestTopicConf.setUrls(clientUrls);
 
                                 latestTopicConf.setIdcUrls(idcUrls);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index af71ce8..831296a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.commons.text.RandomStringGenerator;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.IPUtil;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
 import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
@@ -88,7 +89,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         String requestCode = "";
 
-        if (EventMeshUtil.isService(handleMsgContext.getTopic())) {
+        if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
             requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
         } else {
             requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 0bc645e..2c3a46f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -43,6 +43,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.api.MeshAsyncConsumeContext;
 import org.apache.eventmesh.api.RRCallback;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -511,9 +513,9 @@ public class ClientGroupWrapper {
         logger.info("starting broadCastMsgConsumer success, group:{}", groupName);
     }
 
-    public void subscribe(String topic) throws Exception {
+    public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
         AsyncMessageListener listener = null;
-        if (EventMeshUtil.isBroadcast(topic)) {
+        if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
             listener = new AsyncMessageListener() {
                 @Override
                 public void consume(Message message, AsyncConsumeContext context) {
@@ -534,7 +536,7 @@ public class ClientGroupWrapper {
                     Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);
+                            new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
 
                     while (sessionsItr.hasNext()) {
                         Session session = sessionsItr.next();
@@ -562,7 +564,7 @@ public class ClientGroupWrapper {
                     context.commit(Action.CommitMessage);
                 }
             };
-            broadCastMsgConsumer.subscribe(topic, listener);
+            broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
         } else {
             listener = new AsyncMessageListener() {
                 @Override
@@ -606,7 +608,7 @@ public class ClientGroupWrapper {
                     }
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);
+                            new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
                     //msg put in eventmesh,waiting client ack
                     session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                     session.downstreamMsg(downStreamMsgContext);
@@ -615,15 +617,15 @@ public class ClientGroupWrapper {
                     context.commit(Action.CommitMessage);
                 }
             };
-            persistentMsgConsumer.subscribe(topic, listener);
+            persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
         }
     }
 
-    public void unsubscribe(String topic) throws Exception {
-        if (EventMeshUtil.isBroadcast(topic)) {
-            broadCastMsgConsumer.unsubscribe(topic);
+    public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception {
+        if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
+            broadCastMsgConsumer.unsubscribe(subscriptionItem.getTopic());
         } else {
-            persistentMsgConsumer.unsubscribe(topic);
+            persistentMsgConsumer.unsubscribe(subscriptionItem.getTopic());
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index c45c545..c4cd170 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -34,6 +34,8 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.ThreadUtil;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -280,10 +282,10 @@ public class ClientSessionGroupMapping {
      * @param session
      */
     private void cleanSubscriptionInSession(Session session) throws Exception {
-        for (String topic : session.getSessionContext().subscribeTopics.values()) {
-            session.getClientGroupWrapper().get().removeSubscription(topic, session);
-            if (!session.getClientGroupWrapper().get().hasSubscription(topic)) {
-                session.getClientGroupWrapper().get().unsubscribe(topic);
+        for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
+            session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session);
+            if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
+                session.getClientGroupWrapper().get().unsubscribe(item);
             }
         }
     }
@@ -298,7 +300,7 @@ public class ClientSessionGroupMapping {
         if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) {
             for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
                 DownStreamMsgContext downStreamMsgContext = entry.getValue();
-                if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+                if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
                     logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt), session.getClient());
                     continue;
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 4ff0480..fca8d2a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -32,10 +32,9 @@ import io.openmessaging.api.SendCallback;
 
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
@@ -162,27 +161,27 @@ public class Session {
         this.listenRequestSeq = listenRequestSeq;
     }
 
-    public void subscribe(List<String> topics) throws Exception {
-        for (String topic : topics) {
-            sessionContext.subscribeTopics.putIfAbsent(topic, topic);
-            clientGroupWrapper.get().subscribe(topic);
+    public void subscribe(List<SubscriptionItem> items) throws Exception {
+        for (SubscriptionItem item : items) {
+            sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
+            clientGroupWrapper.get().subscribe(item);
 
-            clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(topic,
+            clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(item.getTopic(),
                     EventMeshConstants.DEFAULT_TIME_OUT_MILLS);
 
-            clientGroupWrapper.get().addSubscription(topic, this);
-            subscribeLogger.info("subscribe|succeed|topic={}|user={}", topic, client);
+            clientGroupWrapper.get().addSubscription(item.getTopic(), this);
+            subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
         }
     }
 
-    public void unsubscribe(List<String> topics) throws Exception {
-        for (String topic : topics) {
-            sessionContext.subscribeTopics.remove(topic);
-            clientGroupWrapper.get().removeSubscription(topic, this);
+    public void unsubscribe(List<SubscriptionItem> items) throws Exception {
+        for (SubscriptionItem item : items) {
+            sessionContext.subscribeTopics.remove(item.getTopic());
+            clientGroupWrapper.get().removeSubscription(item.getTopic(), this);
 
-            if (!clientGroupWrapper.get().hasSubscription(topic)) {
-                clientGroupWrapper.get().unsubscribe(topic);
-                subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", topic, client);
+            if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) {
+                clientGroupWrapper.get().unsubscribe(item);
+                subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
             }
         }
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
index 9d5b22e..e8f851b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.session;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 
 public class SessionContext {
@@ -28,7 +29,7 @@ public class SessionContext {
 
     public ConcurrentHashMap<String, String> sendTopics = new ConcurrentHashMap<String, String>();
 
-    public ConcurrentHashMap<String, String> subscribeTopics = new ConcurrentHashMap<String, String>();
+    public ConcurrentHashMap<String, SubscriptionItem> subscribeTopics = new ConcurrentHashMap<String, SubscriptionItem>();
 
     public long createTime = System.currentTimeMillis();
 
@@ -38,7 +39,7 @@ public class SessionContext {
 
     @Override
     public String toString() {
-        return "SessionContext{subscribeTopics=" + subscribeTopics.keySet()
+        return "SessionContext{subscribeTopics=" + subscribeTopics
                 + ",sendTopics=" + sendTopics.keySet()
                 + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}";
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 01ba40e..8e441bd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -51,6 +53,8 @@ public class DownStreamMsgContext implements Delayed {
 
     public int retryTimes;
 
+    public SubscriptionItem subscriptionItem;
+
     private long executeTime;
 
     public long lastPushTime;
@@ -61,7 +65,7 @@ public class DownStreamMsgContext implements Delayed {
 
     public boolean msgFromOtherEventMesh;
 
-    public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh) {
+    public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh, SubscriptionItem subscriptionItem) {
         this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet());
         this.msgExt = msgExt;
         this.session = session;
@@ -71,6 +75,7 @@ public class DownStreamMsgContext implements Delayed {
         this.lastPushTime = System.currentTimeMillis();
         this.executeTime = System.currentTimeMillis();
         this.createTime = System.currentTimeMillis();
+        this.subscriptionItem = subscriptionItem;
         String ttlStr = msgExt.getUserProperties("TTL");
         long ttl = StringUtils.isNumeric(ttlStr) ? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
         this.expireTime = System.currentTimeMillis() + ttl;
@@ -108,6 +113,7 @@ public class DownStreamMsgContext implements Delayed {
                 ",consumer=" + consumer +
 //  todo              ",consumerGroup=" + consumer.getClass().getConsumerGroup() +
                 ",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION) +
+                ",subscriptionItem=" + subscriptionItem +
                 ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) +
                 ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT) +
                 ",lastPushTime=" + DateFormatUtils.format(lastPushTime, EventMeshConstants.DATE_FORMAT) + '}';
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index b1171f2..12c7cd1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -21,10 +21,9 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -63,9 +62,9 @@ public class SessionPusher {
 
     public void push(final DownStreamMsgContext downStreamMsgContext) {
         Command cmd;
-        if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+        if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
             cmd = Command.BROADCAST_MESSAGE_TO_CLIENT;
-        } else if (EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+        } else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
             cmd = Command.REQUEST_TO_CLIENT;
         } else {
             cmd = Command.ASYNC_MESSAGE_TO_CLIENT;
@@ -102,7 +101,7 @@ public class SessionPusher {
                                 logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
 
                                 //retry
-                                long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
+                                long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
                                 downStreamMsgContext.delay(delayTime);
                                 session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
                             } else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
index fc619b3..b8fcf0f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
@@ -28,6 +28,8 @@ import io.openmessaging.api.Message;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -74,7 +76,7 @@ public class EventMeshTcpRetryer {
             return;
         }
 
-        int maxRetryTimes = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes;
+        int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes;
         if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
             logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.retryTimes,
                     downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
@@ -118,7 +120,7 @@ public class EventMeshTcpRetryer {
 
             Session rechoosen = null;
             String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-            if (!EventMeshUtil.isBroadcast(topic)) {
+            if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
                 rechoosen = downStreamMsgContext.session.getClientGroupWrapper()
                         .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getGroupName()
                                 , topic
@@ -127,7 +129,6 @@ public class EventMeshTcpRetryer {
                 rechoosen = downStreamMsgContext.session;
             }
 
-
             if (rechoosen == null) {
                 logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index c903742..2d4b80d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -85,6 +85,7 @@ public class MessageTransferTask extends AbstractTask {
                 long sendTime = System.currentTimeMillis();
                 addTimestamp(eventMeshMessage, cmd, sendTime);
                 if (cmd.equals(Command.REQUEST_TO_SERVER)) {
+                    //Message Attach SYNC
                     eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
                             .get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
                 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index e26c667..ccacf56 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -22,13 +22,11 @@ import java.util.List;
 
 import io.netty.channel.ChannelHandlerContext;
 
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
-import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,17 +49,14 @@ public class SubscribeTask extends AbstractTask {
                 throw new Exception("subscriptionInfo is null");
             }
 
-            List<String> topicList = new ArrayList<>();
+            List<SubscriptionItem> subscriptionItems = new ArrayList<>();
             for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
-                String topic = subscriptionInfo.getTopicList().get(i);
-                if (!EventMeshUtil.isValidRMBTopic(topic)) {
-                    throw new Exception("invalid topic!");
-                }
-                topicList.add(topic);
+                SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+                subscriptionItems.add(item);
             }
             synchronized (session) {
-                session.subscribe(topicList);
-                messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), topicList);
+                session.subscribe(subscriptionItems);
+                messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
             }
             msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader()
                     .getSeq()));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
index 3a338ed..c35245a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
@@ -19,10 +19,12 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import io.netty.channel.ChannelHandlerContext;
 
 import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.OPStatus;
@@ -46,10 +48,10 @@ public class UnSubscribeTask extends AbstractTask {
         Package msg = new Package();
         try {
             synchronized (session) {
-                List<String> topics = new ArrayList<String>();
+                List<SubscriptionItem> topics = new ArrayList<SubscriptionItem>();
                 if (MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) {
-                    for (String topic : session.getSessionContext().subscribeTopics.keySet()) {
-                        topics.add(topic);
+                    for (Map.Entry<String, SubscriptionItem> entry : session.getSessionContext().subscribeTopics.entrySet()) {
+                        topics.add(entry.getValue());
                     }
                     session.unsubscribe(topics);
                     messageLogger.info("UnSubscriberTask succeed|user={}|topics={}", session.getClient(), topics);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 3814fb7..0ecb375 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -115,64 +115,6 @@ public class EventMeshUtil {
         return str + "/namesrvAddr/" + idc;
     }
 
-    public static boolean isValidRMBTopic(String topic) {
-        if (StringUtils.isEmpty(topic) || !StringUtils.contains(topic, "-")) {
-            return false;
-        }
-
-        String[] args = StringUtils.split(topic, "-");
-        if (ArrayUtils.getLength(args) != 5) {
-            return false;
-        }
-
-        String s0e = args[1];
-        if (!StringUtils.equalsIgnoreCase("s", s0e) && !StringUtils.equalsIgnoreCase("e", s0e)) {
-            return false;
-        }
-
-        String service = args[2];
-        if (!StringUtils.isNumeric(service)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public static String getServiceIDStr(String topic) {
-        if (!isValidRMBTopic(topic)) {
-            return "";
-        }
-
-        String[] args = StringUtils.split(topic, "-");
-        return args[2];
-    }
-
-    public static String getPidStr(String topic) {
-        if (!isValidRMBTopic(topic)) {
-            return "";
-        }
-
-        String[] args = StringUtils.split(topic, "-");
-        return args[3];
-    }
-
-    public static boolean isService(String topic) {
-        String serviceStr = getServiceIDStr(topic);
-        if (StringUtils.isEmpty(serviceStr)) {
-            return false;
-        }
-        return "0".equals(StringUtils.substring(serviceStr, 3, 4));
-    }
-
-    public static boolean isBroadcast(String topic) {
-        String serviceStr = getServiceIDStr(topic);
-        if (StringUtils.isEmpty(serviceStr)) {
-            return false;
-        }
-        String[] args = StringUtils.split(topic, "-");
-        return "3".equals(StringUtils.substring(args[2], 3, 4)) || "4".equals(StringUtils.substring(args[2], 3, 4));
-    }
-
     /**
      * 自定义取堆栈
      *
diff --git a/eventmesh-runtime/src/test/java/client/EventMeshClient.java b/eventmesh-runtime/src/test/java/client/EventMeshClient.java
index ce4a13e..547bf61 100644
--- a/eventmesh-runtime/src/test/java/client/EventMeshClient.java
+++ b/eventmesh-runtime/src/test/java/client/EventMeshClient.java
@@ -17,9 +17,11 @@
 
 package client;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
 import client.hook.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public interface EventMeshClient {
 
@@ -37,13 +39,9 @@ public interface EventMeshClient {
 
     Package listen() throws Exception;
 
-    Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception;
+    Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
-    Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception;
-
-    Package justSubscribe(String topic) throws Exception;
-
-    Package justUnsubscribe(String topic) throws Exception;
+    Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
     void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception;
 
diff --git a/eventmesh-runtime/src/test/java/client/SubClient.java b/eventmesh-runtime/src/test/java/client/SubClient.java
index dff635a..e591ea6 100644
--- a/eventmesh-runtime/src/test/java/client/SubClient.java
+++ b/eventmesh-runtime/src/test/java/client/SubClient.java
@@ -17,7 +17,9 @@
 
 package client;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 
 import client.hook.ReceiveMsgHook;
@@ -32,13 +34,9 @@ public interface SubClient {
 
     void reconnect() throws Exception;
 
-    Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception;
+    Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
-    Package justSubscribe(String topic) throws Exception;
-
-    Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception;
-
-    Package justUnsubscribe(String topic) throws Exception;
+    Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
     Package listen() throws Exception;
 
diff --git a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
index 0eb313e..c1cc984 100644
--- a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
+++ b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
@@ -21,12 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
 
 public class MessageUtils {
     public static int seqLength = 10;
@@ -63,10 +63,10 @@ public class MessageUtils {
         return msg;
     }
 
-    public static Package subscribe(String topic) {
+    public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
-        msg.setBody(generateSubscription(topic));
+        msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
         return msg;
     }
 
@@ -76,10 +76,10 @@ public class MessageUtils {
         return msg;
     }
 
-    public static Package unsubscribe(String topic) {
+    public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
-        msg.setBody(generateSubscription(topic));
+        msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
         return msg;
     }
 
@@ -170,20 +170,20 @@ public class MessageUtils {
 
     public static Subscription generateSubscription() {
         Subscription subscription = new Subscription();
-        List<String> topicList = new ArrayList<>();
-        topicList.add("FT0-s-80000000-01-0");
-        topicList.add("FT0-s-80000000-02-0");
-        topicList.add("FT0-s-80000000-03-0");
-        topicList.add("FT0-s-80000000-04-0");
-        subscription.setTopicList(topicList);
+        List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+        subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+        subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-02-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+        subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-03-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+        subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-04-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+        subscription.setTopicList(subscriptionItems);
         return subscription;
     }
 
-    public static Subscription generateSubscription(String topic) {
+    public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
         Subscription subscription = new Subscription();
-        List<String> topicList = new ArrayList<>();
-        topicList.add(topic);
-        subscription.setTopicList(topicList);
+        List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+        subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+        subscription.setTopicList(subscriptionItems);
         return subscription;
     }
 
diff --git a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
index 5f58c1e..3913bcd 100644
--- a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
+++ b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
@@ -17,7 +17,9 @@
 
 package client.impl;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 
 import client.EventMeshClient;
@@ -76,26 +78,19 @@ public class EventMeshClientImpl implements EventMeshClient {
         this.subClient.heartbeat();
     }
 
-    public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception {
-        return this.subClient.justSubscribe(serviceId, scenario, dcn);
-    }
 
     public Package listen() throws Exception {
         return this.subClient.listen();
     }
 
-    public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception {
-        return this.subClient.justUnsubscribe(serviceId, scenario, dcn);
-    }
-
     @Override
-    public Package justSubscribe(String topic) throws Exception {
-        return this.subClient.justSubscribe(topic);
+    public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType);
     }
 
     @Override
-    public Package justUnsubscribe(String topic) throws Exception {
-        return this.subClient.justUnsubscribe(topic);
+    public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType);
     }
 
 
diff --git a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
index 3a61b48..e771715 100644
--- a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
+++ b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
@@ -27,10 +27,11 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class SubClientImpl extends TCPClient implements SubClient {
 
     private ReceiveMsgHook callback;
 
-    private List<String> topics = new ArrayList<String>();
+    private List<SubscriptionItem> subscriptionItems = new ArrayList<SubscriptionItem>();
 
     private ScheduledFuture<?> task;
 
@@ -72,9 +73,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
     public void reconnect() throws Exception {
         super.reconnect();
         hello();
-        if (!CollectionUtils.isEmpty(topics)) {
-            for (String topic : topics) {
-                Package request = MessageUtils.subscribe(topic);
+        if (!CollectionUtils.isEmpty(subscriptionItems)) {
+            for (SubscriptionItem item : subscriptionItems) {
+                Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType());
                 this.dispatcher(request, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
             }
         }
@@ -117,14 +118,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
         this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
     }
 
-    public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception {
-        Package msg = MessageUtils.subscribe();
-        return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
-    }
-
-    public Package justSubscribe(String topic) throws Exception {
-        topics.add(topic);
-        Package msg = MessageUtils.subscribe(topic);
+    public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+        Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
         return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
     }
 
@@ -144,14 +140,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
 //        this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
 //    }
 
-    public Package justUnsubscribe(String topic) throws Exception {
-        topics.remove(topic);
-        Package msg = MessageUtils.unsubscribe(topic);
-        return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
-    }
-
-    public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception {
-        Package msg = MessageUtils.unsubscribe();
+    public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        subscriptionItems.remove(topic);
+        Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType);
         return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
     }
 
diff --git a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
index eb55d0e..fc479ca 100644
--- a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
@@ -19,6 +19,7 @@ package demo;
 
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
@@ -26,13 +27,14 @@ import client.common.ClientConstants;
 import client.common.MessageUtils;
 import client.hook.ReceiveMsgHook;
 import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public class AsyncSubClient {
     public static void main(String[] args) throws Exception {
         SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
         client.init();
         client.heartbeat();
-        client.justSubscribe(ClientConstants.ASYNC_TOPIC);
+        client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
         client.registerBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
index f3f29f6..3d9385d 100644
--- a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
@@ -19,6 +19,7 @@ package demo;
 
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -27,13 +28,14 @@ import client.common.ClientConstants;
 import client.common.MessageUtils;
 import client.hook.ReceiveMsgHook;
 import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public class BroadCastSubClient {
     public static void main(String[] args) throws Exception {
         SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
         client.init();
         client.heartbeat();
-        client.justSubscribe(ClientConstants.BROADCAST_TOPIC);
+        client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
         client.registerBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/demo/CCSubClient.java
index d415743..935a554 100644
--- a/eventmesh-runtime/src/test/java/demo/CCSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/CCSubClient.java
@@ -19,6 +19,7 @@ package demo;
 
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
@@ -26,6 +27,7 @@ import client.common.MessageUtils;
 import client.common.UserAgentUtils;
 import client.hook.ReceiveMsgHook;
 import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public class CCSubClient {
 
@@ -34,7 +36,7 @@ public class CCSubClient {
         subClient.init();
         subClient.heartbeat();
         subClient.listen();
-        subClient.justSubscribe("FT0-s-80000000-01-0");
+        subClient.justSubscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
         subClient.registerBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/demo/CClientDemo.java
index 590aa3f..61c4f52 100644
--- a/eventmesh-runtime/src/test/java/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/demo/CClientDemo.java
@@ -19,8 +19,10 @@ package demo;
 
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +46,8 @@ public class CClientDemo {
         EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000);
         client.init();
         client.heartbeat();
-        client.justSubscribe(ASYNC_TOPIC);
-        client.justSubscribe(BROADCAST_TOPIC);
+        client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
+        client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
         client.listen();
 //        for (int i = 0; i < 10000; i++) {
 //            Package rr = null;
diff --git a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
index 10e13c4..9bc1141 100644
--- a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
@@ -19,6 +19,7 @@ package demo;
 
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
@@ -26,13 +27,14 @@ import client.common.ClientConstants;
 import client.common.MessageUtils;
 import client.hook.ReceiveMsgHook;
 import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public class SyncSubClient {
     public static void main(String[] args) throws Exception {
         SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
         client.init();
         client.heartbeat();
-        client.justSubscribe(ClientConstants.SYNC_TOPIC);
+        client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
         client.registerBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index ede5ef0..55b7e16 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -17,8 +17,7 @@
 
 package org.apache.eventmesh.client.http.consumer;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +44,7 @@ import org.apache.eventmesh.client.tcp.common.EventMeshThreadFactoryImpl;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.EventMeshException;
 import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
 import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
 import org.apache.eventmesh.common.protocol.http.common.ClientType;
@@ -52,6 +52,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.protocol.tcp.Subscription;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.slf4j.Logger;
@@ -67,7 +68,7 @@ public class LiteConsumer extends AbstractLiteClient {
 
     protected LiteClientConfig eventMeshClientConfig;
 
-    private List<String> subscription = Lists.newArrayList();
+    private List<SubscriptionItem> subscription = Lists.newArrayList();
 
     private LiteMessageListener messageListener;
 
@@ -116,7 +117,7 @@ public class LiteConsumer extends AbstractLiteClient {
         logger.info("LiteConsumer shutdown");
     }
 
-    public boolean subscribe(List<String> topicList, String url) throws Exception {
+    public boolean subscribe(List<SubscriptionItem> topicList, String url) throws Exception {
         subscription.addAll(topicList);
         if (!started.get()) {
             start();
@@ -146,7 +147,7 @@ public class LiteConsumer extends AbstractLiteClient {
 
     }
 
-    private RequestParam generateSubscribeRequestParam(List<String> topicList, String url) {
+    private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList, String url) {
 //        final LiteMessage liteMessage = new LiteMessage();
 //        liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30))
 //                .setContent("subscribe message")
@@ -170,11 +171,11 @@ public class LiteConsumer extends AbstractLiteClient {
         return requestParam;
     }
 
-    private RequestParam generateHeartBeatRequestParam(List<String> topics, String url) {
+    private RequestParam generateHeartBeatRequestParam(List<SubscriptionItem> topics, String url) {
         List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = new ArrayList<>();
-        for (String topic : topics) {
+        for (SubscriptionItem item : topics) {
             HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity();
-            heartbeatEntity.topic = topic;
+            heartbeatEntity.topic = item.getTopic();
             heartbeatEntity.url = url;
             heartbeatEntities.add(heartbeatEntity);
         }
@@ -198,7 +199,7 @@ public class LiteConsumer extends AbstractLiteClient {
         return requestParam;
     }
 
-    public void heartBeat(List<String> topicList, String url) throws Exception {
+    public void heartBeat(List<SubscriptionItem> topicList, String url) throws Exception {
         scheduler.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -234,7 +235,15 @@ public class LiteConsumer extends AbstractLiteClient {
     }
 
     public boolean unsubscribe(List<String> topicList, String url) throws Exception {
-        subscription.removeAll(topicList);
+        Set<String> unSub = new HashSet<>(topicList);
+        Iterator<SubscriptionItem> itr = subscription.iterator();
+        while(itr.hasNext()) {
+            SubscriptionItem item = itr.next();
+            if (unSub.contains(item.getTopic())) {
+                itr.remove();
+            }
+        }
+
         RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);
 
         long startTime = System.currentTimeMillis();
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
index 4e11260..afdb90a 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
@@ -19,7 +19,9 @@ package org.apache.eventmesh.client.tcp;
 
 import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
 public interface EventMeshClient {
 
@@ -39,7 +41,7 @@ public interface EventMeshClient {
 
     void listen() throws Exception;
 
-    void subscribe(String topic) throws Exception;
+    void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
     void unsubscribe() throws Exception;
 
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
index 251fd93..3803edc 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
@@ -19,6 +19,8 @@ package org.apache.eventmesh.client.tcp;
 
 
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 
 public interface SimpleSubClient {
@@ -30,7 +32,7 @@ public interface SimpleSubClient {
 
     void reconnect() throws Exception;
 
-    void subscribe(String topic) throws Exception;
+    void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
 
     void unsubscribe() throws Exception;
 
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 4122a23..87ef68a 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -21,10 +21,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
 public class MessageUtils {
@@ -55,10 +56,10 @@ public class MessageUtils {
         return msg;
     }
 
-    public static Package subscribe(String topic) {
+    public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
-        msg.setBody(generateSubscription(topic));
+        msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
         return msg;
     }
 
@@ -130,11 +131,11 @@ public class MessageUtils {
         return user;
     }
 
-    private static Subscription generateSubscription(String topic) {
+    private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
         Subscription subscription = new Subscription();
-        List<String> topicList = new ArrayList<>();
-        topicList.add(topic);
-        subscription.setTopicList(topicList);
+        List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+        subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+        subscription.setTopicList(subscriptionItems);
         return subscription;
     }
 
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
index 4cc14eb..8e645fa 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
@@ -24,7 +24,9 @@ import org.apache.eventmesh.client.tcp.SimpleSubClient;
 import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
 import org.apache.eventmesh.client.tcp.common.MessageUtils;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 
 public class DefaultEventMeshClient implements EventMeshClient {
@@ -95,8 +97,8 @@ public class DefaultEventMeshClient implements EventMeshClient {
     }
 
     @Override
-    public void subscribe(String topic) throws Exception {
-        this.subClient.subscribe(topic);
+    public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        this.subClient.subscribe(topic, subscriptionMode, subcriptionType);
     }
 
     @Override
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
index 0579184..38d52f6 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
@@ -33,10 +33,12 @@ import org.apache.eventmesh.client.tcp.common.MessageUtils;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.common.RequestContext;
 import org.apache.eventmesh.client.tcp.common.TcpClient;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
 
+import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +50,7 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
 
     private ReceiveMsgHook callback;
 
-    private List<String> topics = new ArrayList<String>();
+    private List<SubscriptionItem> subscriptionItems = new ArrayList<SubscriptionItem>();
 
     private ScheduledFuture<?> task;
 
@@ -70,9 +72,9 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
     public void reconnect() throws Exception {
         super.reconnect();
         hello();
-        if (!CollectionUtils.isEmpty(topics)) {
-            for (String topic : topics) {
-                Package request = MessageUtils.subscribe(topic);
+        if (!CollectionUtils.isEmpty(subscriptionItems)) {
+            for (SubscriptionItem item : subscriptionItems) {
+                Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType());
                 this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
             }
         }
@@ -121,9 +123,9 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
     }
 
 
-    public void subscribe(String topic) throws Exception {
-        topics.add(topic);
-        Package request = MessageUtils.subscribe(topic);
+    public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+        subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+        Package request = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
         this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
     }
 
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
index 319e428..12da846 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
@@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -45,7 +47,7 @@ public class AsyncSubscribe implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-e-80010000-01-1");
+            client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
 
             client.listen();
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
index 8b77e1a..e21ee45 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
@@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -45,7 +47,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-e-80030001-01-3");
+            client.subscribe("FT0-e-80030001-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
 
             client.listen();
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
index b4fbbf7..756150a 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
@@ -23,6 +23,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class SyncResponse implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-s-80000000-01-0");
+            client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
             //同步RR消息
             client.registerSubBusiHandler(handler);
 
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
index ed87688..3fcfa3f 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
@@ -19,6 +19,7 @@
 
 package org.apache.eventmesh.http.demo.sub.service;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
@@ -29,6 +30,9 @@ import org.apache.eventmesh.client.http.consumer.LiteConsumer;
 import org.apache.eventmesh.common.EventMeshException;
 import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.ThreadUtil;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.http.demo.AsyncPublishInstance;
 import org.apache.eventmesh.util.Utils;
 import org.slf4j.Logger;
@@ -48,7 +52,7 @@ public class SubService implements InitializingBean {
 
     final Properties properties = Utils.readPropertiesFile("application.properties");
 
-    final List<String> topicList = Arrays.asList("FT0-e-80010001-01-1");
+    final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("FT0-e-80010001-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC));
     final String localIp = IPUtil.getLocalAddress();
     final String localPort = properties.getProperty("server.port");
     final String eventMeshIp = properties.getProperty("eventmesh.ip");
@@ -100,7 +104,11 @@ public class SubService implements InitializingBean {
     public void cleanup() {
         logger.info("start destory ....");
         try {
-            liteConsumer.unsubscribe(topicList, url);
+            List<String> unSubList = new ArrayList<>();
+            for (SubscriptionItem item:topicList) {
+                unSubList.add(item.getTopic());
+            }
+            liteConsumer.unsubscribe(unSubList, url);
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
index 1f7d3c8..4cde0ee 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
@@ -24,8 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.util.Utils;
@@ -50,7 +52,7 @@ public class AsyncSubscribe implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-e-80010000-01-1");
+            client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
 
             client.listen();
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
index 74bdaaa..cf2323a 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
@@ -24,8 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.util.Utils;
@@ -50,7 +52,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-e-80030000-01-3");
+            client.subscribe("FT0-e-80030000-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
 
             client.listen();
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
index 969163d..c4cb72e 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
@@ -22,7 +22,9 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.eventmesh.client.tcp.EventMeshClient;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
 import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class SyncResponse implements ReceiveMsgHook {
             client.init();
             client.heartbeat();
 
-            client.subscribe("FT0-s-80000000-01-0");
+            client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
             //同步RR消息
             client.registerSubBusiHandler(handler);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org