You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/22 02:24:45 UTC

[incubator-eventmesh] branch master updated: [Issue #774] Optimize the object property description of eventmesh client (#775)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a223c  [Issue #774] Optimize the object property description of eventmesh client (#775)
12a223c is described below

commit 12a223c4f28f9c46f559c1e88eb87325bb8a7639
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Tue Feb 22 10:24:38 2022 +0800

    [Issue #774] Optimize the object property description of eventmesh client (#775)
    
    * modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field
    
    * modify: fix checksyle error
    
    * modify: fix checksyle error in ClientGroupWrapper.java
    
    close #774
---
 .../eventmesh/common/protocol/tcp/UserAgent.java   |  19 +-
 .../eventmesh/tcp/common/EventMeshTestUtils.java   |   6 +-
 .../handler/ShowListenClientByTopicHandler.java    |   2 +-
 .../tcp/client/group/ClientGroupWrapper.java       | 366 ++++++++++-----------
 .../client/group/ClientSessionGroupMapping.java    |  26 +-
 .../core/protocol/tcp/client/task/HelloTask.java   |   9 +-
 .../protocol/tcp/client/task/RecommendTask.java    |   8 +-
 .../eventmesh/client/tcp/common/MessageUtils.java  |   6 +-
 .../client/tcp/common/EventMeshTestUtils.java      |   6 +-
 9 files changed, 213 insertions(+), 235 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
index 077ae9a..9e1d98b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
@@ -36,8 +36,7 @@ public class UserAgent {
     private String username;
     private String password;
     private String idc;
-    private String producerGroup;
-    private String consumerGroup;
+    private String group;
     private String purpose;
     @Builder.Default
     private int unack = 0;
@@ -46,8 +45,7 @@ public class UserAgent {
     }
 
     public UserAgent(String env, String subsystem, String path, int pid, String host, int port, String version,
-                     String username, String password, String idc, String producerGroup, String consumerGroup,
-                     String purpose, int unack) {
+                     String username, String password, String idc, String group, String purpose, int unack) {
         this.env = env;
         this.subsystem = subsystem;
         this.path = path;
@@ -58,8 +56,7 @@ public class UserAgent {
         this.username = username;
         this.password = password;
         this.idc = idc;
-        this.producerGroup = producerGroup;
-        this.consumerGroup = consumerGroup;
+        this.group = group;
         this.purpose = purpose;
         this.unack = unack;
     }
@@ -67,8 +64,9 @@ public class UserAgent {
     @Override
     public String toString() {
         return String.format(
-                "UserAgent{env='%s', subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
-                env, subsystem, path, pid, host, port, version, idc, purpose, unack);
+                "UserAgent{env='%s', subsystem='%s', group='%s', path='%s', pid=%d, host='%s',"
+                + " port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
+                env, subsystem, group, path, pid, host, port, version, idc, purpose, unack);
     }
 
     @Override
@@ -99,6 +97,10 @@ public class UserAgent {
             return false;
         }
 
+        if (!Objects.equals(group, userAgent.group)) {
+            return false;
+        }
+
 
         if (!Objects.equals(path, userAgent.path)) {
             return false;
@@ -134,6 +136,7 @@ public class UserAgent {
     @Override
     public int hashCode() {
         int result = subsystem != null ? subsystem.hashCode() : 0;
+        result = 31 * result + (group != null ? group.hashCode() : 0);
         result = 31 * result + (path != null ? path.hashCode() : 0);
         result = 31 * result + pid;
         result = 31 * result + (host != null ? host.hashCode() : 0);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index f7da239..dbb2729 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -51,8 +51,7 @@ public class EventMeshTestUtils {
                 .host("127.0.0.1")
                 .password(generateRandomString(8))
                 .username("PU4283")
-                .producerGroup("EventmeshTest-ProducerGroup")
-                .consumerGroup("EventmeshTest-ConsumerGroup")
+                .group("EventmeshTestGroup")
                 .path("/data/app/umg_proxy")
                 .port(8362)
                 .subsystem("5023")
@@ -70,8 +69,7 @@ public class EventMeshTestUtils {
                 .host("127.0.0.1")
                 .password(generateRandomString(8))
                 .username("PU4283")
-                .producerGroup("EventmeshTest-ProducerGroup")
-                .consumerGroup("EventmeshTest-ConsumerGroup")
+                .group("EventmeshTestGroup")
                 .path("/data/app/umg_proxy")
                 .port(9362)
                 .subsystem("5017")
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
index 95b3272..a7b11be 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
@@ -67,7 +67,7 @@ public class ShowListenClientByTopicHandler implements HttpHandler {
                 for (ClientGroupWrapper cgw : clientGroupMap.values()) {
                     Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
                     if (listenSessionSet != null && listenSessionSet.size() > 0) {
-                        result += String.format("group:%s", cgw.getConsumerGroup()) + newLine;
+                        result += String.format("group:%s", cgw.getGroup()) + newLine;
                         for (Session session : listenSessionSet) {
                             UserAgent userAgent = session.getClient();
                             result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 091f510..7d45219 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -69,13 +69,10 @@ public class ClientGroupWrapper {
 
     public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
 
-    private String producerGroup;
-
-    private String consumerGroup;
-
-    //can be sysid + ext(eg dcn)
     private String sysId;
 
+    private String group;
+
     private EventMeshTCPConfiguration eventMeshTCPConfiguration;
 
     private EventMeshTCPServer eventMeshTCPServer;
@@ -105,29 +102,29 @@ public class ClientGroupWrapper {
     private MQConsumerWrapper broadCastMsgConsumer;
 
     private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
-            new ConcurrentHashMap<String, Set<Session>>();
+        new ConcurrentHashMap<String, Set<Session>>();
 
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
     private MQProducerWrapper mqProducerWrapper;
 
-    public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
+    public ClientGroupWrapper(String sysId, String group,
                               EventMeshTCPServer eventMeshTCPServer,
                               DownstreamDispatchStrategy downstreamDispatchStrategy) {
         this.sysId = sysId;
-        this.producerGroup = producerGroup;
-        this.consumerGroup = consumerGroup;
+        this.group = group;
         this.eventMeshTCPServer = eventMeshTCPServer;
         this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
         this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
-        this.eventMeshTcpMonitor = Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
+        this.eventMeshTcpMonitor =
+            Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
         this.persistentMsgConsumer = new MQConsumerWrapper(
-                eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
         this.broadCastMsgConsumer = new MQConsumerWrapper(
-                eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
         this.mqProducerWrapper = new MQProducerWrapper(
-                eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
     }
 
     public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -149,13 +146,14 @@ public class ClientGroupWrapper {
     }
 
     public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback)
-            throws Exception {
+        throws Exception {
         mqProducerWrapper.send(upStreamMsgContext.getEvent(), sendCallback);
         return true;
     }
 
-    public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback rrCallback, long timeout)
-            throws Exception {
+    public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback rrCallback,
+                        long timeout)
+        throws Exception {
         mqProducerWrapper.request(upStreamMsgContext.getEvent(), rrCallback, timeout);
     }
 
@@ -168,10 +166,11 @@ public class ClientGroupWrapper {
 
             @Override
             public void onException(OnExceptionContext context) {
-                String bizSeqNo = (String) upStreamMsgContext.getEvent().getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
+                String bizSeqNo = (String) upStreamMsgContext.getEvent()
+                    .getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
                 logger.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
-                        upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
-                        upStreamMsgContext.getSession().getClient(), context.getException());
+                    upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
+                    upStreamMsgContext.getSession().getClient(), context.getException());
             }
         });
         return true;
@@ -182,8 +181,8 @@ public class ClientGroupWrapper {
     }
 
     public boolean addSubscription(String topic, Session session) throws Exception {
-        if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+        if (session == null || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("addSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
@@ -197,16 +196,16 @@ public class ClientGroupWrapper {
             }
             r = topic2sessionInGroupMapping.get(topic).add(session);
             if (r) {
-                logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup,
-                        topic, session.getClient());
+                logger.info("addSubscription success, group:{} topic:{} client:{}", group,
+                    topic, session.getClient());
             } else {
                 logger
-                        .warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic,
-                                session.getClient());
+                    .warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
+                        session.getClient());
             }
         } catch (Exception e) {
             logger
-                    .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+                .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
             throw new Exception("addSubscription fail");
         } finally {
             this.groupLock.writeLock().unlock();
@@ -216,8 +215,8 @@ public class ClientGroupWrapper {
 
     public boolean removeSubscription(String topic, Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("removeSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
@@ -229,22 +228,22 @@ public class ClientGroupWrapper {
                 r = topic2sessionInGroupMapping.get(topic).remove(session);
                 if (r) {
                     logger.info(
-                            "removeSubscription remove session success, group:{} topic:{} client:{}",
-                            consumerGroup, topic, session.getClient());
+                        "removeSubscription remove session success, group:{} topic:{} client:{}",
+                        group, topic, session.getClient());
                 } else {
                     logger.warn(
-                            "removeSubscription remove session failed, group:{} topic:{} client:{}",
-                            consumerGroup, topic, session.getClient());
+                        "removeSubscription remove session failed, group:{} topic:{} client:{}",
+                        group, topic, session.getClient());
                 }
             }
             if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
                 topic2sessionInGroupMapping.remove(topic);
                 logger.info("removeSubscription remove topic success, group:{} topic:{}",
-                        consumerGroup, topic);
+                    group, topic);
             }
         } catch (Exception e) {
             logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
-                    e);
+                e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -257,9 +256,9 @@ public class ClientGroupWrapper {
         }
 
         Properties keyValue = new Properties();
-        keyValue.put("producerGroup", producerGroup);
+        keyValue.put("producerGroup", group);
         keyValue.put("instanceName", EventMeshUtil
-                .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
+            .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
 
         //TODO for defibus
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
@@ -267,7 +266,7 @@ public class ClientGroupWrapper {
         mqProducerWrapper.init(keyValue);
         mqProducerWrapper.start();
         producerStarted.compareAndSet(false, true);
-        logger.info("starting producer success, group:{}", producerGroup);
+        logger.info("starting producer success, group:{}", group);
     }
 
     public synchronized void shutdownProducer() throws Exception {
@@ -276,29 +275,21 @@ public class ClientGroupWrapper {
         }
         mqProducerWrapper.shutdown();
         producerStarted.compareAndSet(true, false);
-        logger.info("shutdown producer success for group:{}", producerGroup);
-    }
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
+        logger.info("shutdown producer success for group:{}", group);
     }
 
-    public String getConsumerGroup() {
-        return consumerGroup;
+    public String getGroup() {
+        return group;
     }
 
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
+    public void setGroup(String group) {
+        this.group = group;
     }
 
     public boolean addGroupConsumerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("addGroupConsumerSession param error,session:{}", session);
             return false;
         }
@@ -308,12 +299,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupConsumerSessions.add(session);
             if (r) {
-                logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup,
-                        session.getClient());
+                logger.info("addGroupConsumerSession success, group:{} client:{}", group,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup,
-                    session.getClient(), e);
+            logger.error("addGroupConsumerSession error! group:{} client:{}", group,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -322,8 +313,8 @@ public class ClientGroupWrapper {
 
     public boolean addGroupProducerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(producerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("addGroupProducerSession param error,session:{}", session);
             return false;
         }
@@ -333,12 +324,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupProducerSessions.add(session);
             if (r) {
-                logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup,
-                        session.getClient());
+                logger.info("addGroupProducerSession success, group:{} client:{}", group,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup,
-                    session.getClient(), e);
+            logger.error("addGroupProducerSession error! group:{} client:{}", group,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -347,8 +338,8 @@ public class ClientGroupWrapper {
 
     public boolean removeGroupConsumerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("removeGroupConsumerSession param error,session:{}", session);
             return false;
         }
@@ -358,12 +349,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupConsumerSessions.remove(session);
             if (r) {
-                logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup,
-                        session.getClient());
+                logger.info("removeGroupConsumerSession success, group:{} client:{}", group,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup,
-                    session.getClient(), e);
+            logger.error("removeGroupConsumerSession error! group:{} client:{}", group,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -372,8 +363,8 @@ public class ClientGroupWrapper {
 
     public boolean removeGroupProducerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(producerGroup,
-                EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(group,
+            EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
             logger.error("removeGroupProducerSession param error,session:{}", session);
             return false;
         }
@@ -383,12 +374,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupProducerSessions.remove(session);
             if (r) {
-                logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup,
-                        session.getClient());
+                logger.info("removeGroupProducerSession success, group:{} client:{}", group,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup,
-                    session.getClient(), e);
+            logger.error("removeGroupProducerSession error! group:{} client:{}", group,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -403,15 +394,15 @@ public class ClientGroupWrapper {
 
         Properties keyValue = new Properties();
         keyValue.put("isBroadcast", "false");
-        keyValue.put("consumerGroup", consumerGroup);
+        keyValue.put("consumerGroup", group);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
         keyValue.put("instanceName", EventMeshUtil
-                .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+            .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
 
         persistentMsgConsumer.init(keyValue);
 
         inited4Persistent.compareAndSet(false, true);
-        logger.info("init persistentMsgConsumer success, group:{}", consumerGroup);
+        logger.info("init persistentMsgConsumer success, group:{}", group);
     }
 
     public synchronized void startClientGroupPersistentConsumer() throws Exception {
@@ -420,7 +411,7 @@ public class ClientGroupWrapper {
         }
         persistentMsgConsumer.start();
         started4Persistent.compareAndSet(false, true);
-        logger.info("starting persistentMsgConsumer success, group:{}", consumerGroup);
+        logger.info("starting persistentMsgConsumer success, group:{}", group);
     }
 
     public synchronized void initClientGroupBroadcastConsumer() throws Exception {
@@ -430,14 +421,14 @@ public class ClientGroupWrapper {
 
         Properties keyValue = new Properties();
         keyValue.put("isBroadcast", "true");
-        keyValue.put("consumerGroup", consumerGroup);
+        keyValue.put("consumerGroup", group);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
         keyValue.put("instanceName", EventMeshUtil
-                .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+            .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
         broadCastMsgConsumer.init(keyValue);
 
         inited4Broadcast.compareAndSet(false, true);
-        logger.info("init broadCastMsgConsumer success, group:{}", consumerGroup);
+        logger.info("init broadCastMsgConsumer success, group:{}", group);
     }
 
     public synchronized void startClientGroupBroadcastConsumer() throws Exception {
@@ -446,122 +437,123 @@ public class ClientGroupWrapper {
         }
         broadCastMsgConsumer.start();
         started4Broadcast.compareAndSet(false, true);
-        logger.info("starting broadCastMsgConsumer success, group:{}", consumerGroup);
+        logger.info("starting broadCastMsgConsumer success, group:{}", group);
     }
 
     public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
         EventListener listener = null;
         if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
-            listener = new EventListener() {
-                @Override
-                public void consume(CloudEvent event, AsyncConsumeContext context) {
-
-                    eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
-                    event = CloudEventBuilder.from(event)
-                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                            String.valueOf(System.currentTimeMillis()))
-                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                            eventMeshTCPConfiguration.eventMeshServerIp).build();
-                    String topic = event.getSubject();
-                    //    message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-                    //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                    //    String.valueOf(System.currentTimeMillis()));
-                    //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                    //    eventMeshTCPConfiguration.eventMeshServerIp);
-
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                            (EventMeshAsyncConsumeContext) context;
-                    if (CollectionUtils.isEmpty(groupConsumerSessions)) {
-                        logger.warn("found no session to downstream broadcast msg");
-                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
-                        return;
-                    }
+            listener = (event, context) -> {
 
-                    Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
+                eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+                    .incrementAndGet();
+                event = CloudEventBuilder.from(event)
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()))
+                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshTCPConfiguration.eventMeshServerIp).build();
+                String topic = event.getSubject();
+                //    message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+                //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                //    String.valueOf(System.currentTimeMillis()));
+                //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                //    eventMeshTCPConfiguration.eventMeshServerIp);
+
+                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+                    (EventMeshAsyncConsumeContext) context;
+                if (CollectionUtils.isEmpty(groupConsumerSessions)) {
+                    logger.warn("found no session to downstream broadcast msg");
+                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
+                    return;
+                }
 
-                    DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(event, null, broadCastMsgConsumer,
-                                    eventMeshAsyncConsumeContext.getAbstractContext(), false,
-                                    subscriptionItem);
+                Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
 
-                    while (sessionsItr.hasNext()) {
-                        Session session = sessionsItr.next();
+                DownStreamMsgContext downStreamMsgContext =
+                    new DownStreamMsgContext(event, null, broadCastMsgConsumer,
+                        eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                        subscriptionItem);
 
-                        if (!session.isAvailable(topic)) {
-                            logger
-                                    .warn("downstream broadcast msg,session is not available,client:{}",
-                                            session.getClient());
-                            continue;
-                        }
+                while (sessionsItr.hasNext()) {
+                    Session session = sessionsItr.next();
 
-                        downStreamMsgContext.session = session;
-
-                        //downstream broadcast msg asynchronously
-                        eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
-                                .submit(new Runnable() {
-                                    @Override
-                                    public void run() {
-                                        //msg put in eventmesh,waiting client ack
-                                        session.getPusher()
-                                                .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
-                                        session.downstreamMsg(downStreamMsgContext);
-                                    }
-                                });
+                    if (!session.isAvailable(topic)) {
+                        logger
+                            .warn("downstream broadcast msg,session is not available,client:{}",
+                                session.getClient());
+                        continue;
                     }
 
-                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
+                    downStreamMsgContext.session = session;
+
+                    //downstream broadcast msg asynchronously
+                    eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+                        .submit(new Runnable() {
+                            @Override
+                            public void run() {
+                                //msg put in eventmesh,waiting client ack
+                                session.getPusher()
+                                    .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+                                session.downstreamMsg(downStreamMsgContext);
+                            }
+                        });
                 }
+
+                eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
             };
             broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
         } else {
             listener = (event, context) -> {
-                eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
+                eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
+                    .incrementAndGet();
                 event = CloudEventBuilder.from(event)
-                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                                String.valueOf(System.currentTimeMillis()))
-                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                                eventMeshTCPConfiguration.eventMeshServerIp).build();
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()))
+                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshTCPConfiguration.eventMeshServerIp).build();
                 String topic = event.getSubject();
 
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                        (EventMeshAsyncConsumeContext) context;
+                    (EventMeshAsyncConsumeContext) context;
                 Session session = downstreamDispatchStrategy
-                        .select(consumerGroup, topic, groupConsumerSessions);
+                    .select(group, topic, groupConsumerSessions);
                 String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
                 if (session == null) {
                     try {
                         Integer sendBackTimes = 0;
                         String sendBackFromEventMeshIp = "";
                         if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
-                                EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
-                            sendBackTimes = (Integer) event.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
+                            EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
+                            sendBackTimes = (Integer) event.getExtension(
+                                EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
                         }
                         if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
-                                EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
-                            sendBackFromEventMeshIp = (String) event.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+                            EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
+                            sendBackFromEventMeshIp = (String) event.getExtension(
+                                EventMeshConstants.EVENTMESH_SEND_BACK_IP);
                         }
 
                         logger.error(
-                                "found no session to downstream msg,groupName:{}, topic:{}, "
-                                        + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
-                                consumerGroup, topic, bizSeqNo, sendBackTimes,
-                                sendBackFromEventMeshIp);
+                            "found no session to downstream msg,groupName:{}, topic:{}, "
+                                + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+                            group, topic, bizSeqNo, sendBackTimes,
+                            sendBackFromEventMeshIp);
 
                         if (sendBackTimes >= eventMeshTCPServer
-                                .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+                            .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
                             logger.error(
-                                    "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
-                                            + "bizSeqNo:{}", eventMeshTCPServer
-                                            .getEventMeshTCPConfiguration()
-                                            .eventMeshTcpSendBackMaxTimes,
-                                    consumerGroup, topic, bizSeqNo);
+                                "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+                                    + "bizSeqNo:{}", eventMeshTCPServer
+                                    .getEventMeshTCPConfiguration()
+                                    .eventMeshTcpSendBackMaxTimes,
+                                group, topic, bizSeqNo);
                         } else {
                             sendBackTimes++;
                             event = CloudEventBuilder.from(event)
-                                    .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
-                                            sendBackTimes.toString())
-                                    .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
-                                            eventMeshTCPConfiguration.eventMeshServerIp).build();
+                                .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+                                    sendBackTimes.toString())
+                                .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+                                    eventMeshTCPConfiguration.eventMeshServerIp).build();
                             sendMsgBackToBroker(event, bizSeqNo);
                         }
                     } catch (Exception e) {
@@ -573,9 +565,9 @@ public class ClientGroupWrapper {
                 }
 
                 DownStreamMsgContext downStreamMsgContext =
-                        new DownStreamMsgContext(event, session, persistentMsgConsumer,
-                                eventMeshAsyncConsumeContext.getAbstractContext(), false,
-                                subscriptionItem);
+                    new DownStreamMsgContext(event, session, persistentMsgConsumer,
+                        eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                        subscriptionItem);
                 //msg put in eventmesh,waiting client ack
                 session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                 session.downstreamMsg(downStreamMsgContext);
@@ -596,7 +588,7 @@ public class ClientGroupWrapper {
     public synchronized void shutdownBroadCastConsumer() throws Exception {
         if (started4Broadcast.get()) {
             broadCastMsgConsumer.shutdown();
-            logger.info("broadcast consumer group:{} shutdown...", consumerGroup);
+            logger.info("broadcast consumer group:{} shutdown...", group);
         }
         started4Broadcast.compareAndSet(true, false);
         inited4Broadcast.compareAndSet(true, false);
@@ -607,7 +599,7 @@ public class ClientGroupWrapper {
 
         if (started4Persistent.get()) {
             persistentMsgConsumer.shutdown();
-            logger.info("persistent consumer group:{} shutdown...", consumerGroup);
+            logger.info("persistent consumer group:{} shutdown...", group);
         }
         started4Persistent.compareAndSet(true, false);
         inited4Persistent.compareAndSet(true, false);
@@ -651,7 +643,7 @@ public class ClientGroupWrapper {
     }
 
     public void setDownstreamDispatchStrategy(
-            DownstreamDispatchStrategy downstreamDispatchStrategy) {
+        DownstreamDispatchStrategy downstreamDispatchStrategy) {
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
     }
 
@@ -662,24 +654,24 @@ public class ClientGroupWrapper {
     private String pushMsgToEventMesh(CloudEvent msg, String ip, int port) throws Exception {
         StringBuilder targetUrl = new StringBuilder();
         targetUrl.append("http://").append(ip).append(":").append(port)
-                .append("/eventMesh/msg/push");
+            .append("/eventMesh/msg/push");
         HttpTinyClient.HttpResult result = null;
 
         try {
             logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl,
-                    msg);
+                msg);
             List<String> paramValues = new ArrayList<String>();
             paramValues.add("msg");
             paramValues.add(JsonUtils.serialize(msg));
             paramValues.add("group");
-            paramValues.add(consumerGroup);
+            paramValues.add(group);
 
             result = HttpTinyClient.httpPost(
-                    targetUrl.toString(),
-                    null,
-                    paramValues,
-                    "UTF-8",
-                    3000);
+                targetUrl.toString(),
+                null,
+                paramValues,
+                "UTF-8",
+                3000);
         } catch (Exception e) {
             logger.error("httpPost " + targetUrl + " is fail,", e);
             //throw new RuntimeException("httpPost " + targetUrl + " is fail," , e);
@@ -691,7 +683,7 @@ public class ClientGroupWrapper {
 
         } else {
             throw new Exception("httpPost targetUrl[" + targetUrl
-                    + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
+                + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
         }
     }
 
@@ -707,22 +699,22 @@ public class ClientGroupWrapper {
             long startTime = System.currentTimeMillis();
             long taskExcuteTime = startTime;
             send(new UpStreamMsgContext(null, event, null, startTime, taskExcuteTime),
-                    new SendCallback() {
-                        @Override
-                        public void onSuccess(SendResult sendResult) {
-                            logger.info(
-                                    "consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, "
-                                            + "topic:{}", consumerGroup, bizSeqNo, topic);
-                        }
+                new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        logger.info(
+                            "group:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+                                + "topic:{}", group, bizSeqNo, topic);
+                    }
 
-                        @Override
-                        public void onException(OnExceptionContext context) {
-                            logger.warn(
-                                    "consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{},"
-                                            + " topic:{}", consumerGroup, bizSeqNo, topic);
-                        }
+                    @Override
+                    public void onException(OnExceptionContext context) {
+                        logger.warn(
+                            "group:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+                                + " topic:{}", group, bizSeqNo, topic);
+                    }
 
-                    });
+                });
             eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
         } catch (Exception e) {
             logger.warn("try send msg back to broker failed");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 548d7e5..003186e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -179,29 +179,29 @@ public class ClientSessionGroupMapping {
         }
     }
 
-    private ClientGroupWrapper constructClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
+    private ClientGroupWrapper constructClientGroupWrapper(String sysId, String group,
                                                            EventMeshTCPServer eventMeshTCPServer,
                                                            DownstreamDispatchStrategy downstreamDispatchStrategy) {
-        return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer,
+        return new ClientGroupWrapper(sysId, group, eventMeshTCPServer,
                 downstreamDispatchStrategy);
     }
 
     private void initClientGroupWrapper(UserAgent user, Session session) throws Exception {
-        if (!lockMap.containsKey(user.getSubsystem())) {
-            Object obj = lockMap.putIfAbsent(user.getSubsystem(), new Object());
+        if (!lockMap.containsKey(user.getGroup())) {
+            Object obj = lockMap.putIfAbsent(user.getGroup(), new Object());
             if (obj == null) {
-                logger.info("add lock to map for subsystem:{}", user.getSubsystem());
+                logger.info("add lock to map for group:{}", user.getGroup());
             }
         }
-        synchronized (lockMap.get(user.getSubsystem())) {
-            if (!clientGroupMap.containsKey(user.getSubsystem())) {
-                ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getProducerGroup(),
-                        user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy());
-                clientGroupMap.put(user.getSubsystem(), cgw);
-                logger.info("create new ClientGroupWrapper, subsystem:{}", user.getSubsystem());
+        synchronized (lockMap.get(user.getGroup())) {
+            if (!clientGroupMap.containsKey(user.getGroup())) {
+                ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getGroup(),
+                        eventMeshTCPServer, new FreePriorityDispatchStrategy());
+                clientGroupMap.put(user.getGroup(), cgw);
+                logger.info("create new ClientGroupWrapper, group:{}", user.getGroup());
             }
 
-            ClientGroupWrapper cgw = clientGroupMap.get(user.getSubsystem());
+            ClientGroupWrapper cgw = clientGroupMap.get(user.getGroup());
 
             if (EventMeshConstants.PURPOSE_PUB.equals(user.getPurpose())) {
                 startClientGroupProducer(cgw, session);
@@ -307,7 +307,7 @@ public class ClientSessionGroupMapping {
                     continue;
                 }
                 Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy()
-                        .select(session.getClientGroupWrapper().get().getConsumerGroup(),
+                        .select(session.getClientGroupWrapper().get().getGroup(),
                                 downStreamMsgContext.event.getSubject(),
                                 Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions);
                 if (reChooseSession != null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
index 4d25335..12ca6ff 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
@@ -115,14 +115,9 @@ public class HelloTask extends AbstractTask {
             throw new Exception("client purpose config is error");
         }
 
-        if (StringUtils.equals(EventMeshConstants.PURPOSE_PUB, user.getPurpose())
-                && StringUtils.isBlank(user.getProducerGroup())) {
-            throw new Exception("client producerGroup cannot be null");
+        if (StringUtils.isBlank(user.getGroup())) {
+            throw new Exception("client group cannot be null");
         }
 
-        if (StringUtils.equals(EventMeshConstants.PURPOSE_SUB, user.getPurpose())
-                && StringUtils.isBlank(user.getConsumerGroup())) {
-            throw new Exception("client consumerGroup cannot be null");
-        }
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
index da595ba..6499e8e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java
@@ -97,12 +97,6 @@ public class RecommendTask extends AbstractTask {
         if (userAgent == null) {
             return null;
         }
-        if (EventMeshConstants.PURPOSE_PUB.equals(userAgent.getPurpose())) {
-            return userAgent.getProducerGroup();
-        } else if (EventMeshConstants.PURPOSE_SUB.equals(userAgent.getPurpose())) {
-            return userAgent.getConsumerGroup();
-        } else {
-            return null;
-        }
+        return userAgent.getGroup();
     }
 }
\ No newline at end of file
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 96b6ad7..e006fa8 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -150,8 +150,7 @@ public class MessageUtils {
                 .pid(agent.getPid())
                 .version(agent.getVersion())
                 .idc(agent.getIdc())
-                .consumerGroup(agent.getConsumerGroup())
-                .producerGroup(agent.getProducerGroup())
+                .group(agent.getGroup())
                 .purpose(EventMeshCommon.USER_AGENT_PURPOSE_SUB)
                 .build();
     }
@@ -168,8 +167,7 @@ public class MessageUtils {
                 .pid(agent.getPid())
                 .version(agent.getVersion())
                 .idc(agent.getIdc())
-                .consumerGroup(agent.getConsumerGroup())
-                .producerGroup(agent.getProducerGroup())
+                .group(agent.getGroup())
                 .purpose(EventMeshCommon.USER_AGENT_PURPOSE_PUB)
                 .build();
     }
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
index 94d273b..42db31b 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
@@ -38,8 +38,7 @@ public class EventMeshTestUtils {
                 .host("127.0.0.1")
                 .password(generateRandomString(8))
                 .username("PU4283")
-                .consumerGroup("EventmeshTest-ConsumerGroup")
-                .producerGroup("EventmeshTest-ProducerGroup")
+                .group("EventmeshTestGroup")
                 .path("/data/app/umg_proxy")
                 .port(8362)
                 .subsystem("5023")
@@ -54,8 +53,7 @@ public class EventMeshTestUtils {
                 .host("127.0.0.1")
                 .password(generateRandomString(8))
                 .username("PU4283")
-                .consumerGroup("EventmeshTest-ConsumerGroup")
-                .producerGroup("EventmeshTest-ProducerGroup")
+                .group("EventmeshTestGroup")
                 .path("/data/app/umg_proxy")
                 .port(9362)
                 .subsystem("5017")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org