You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:52:24 UTC

[rocketmq] branch develop updated: [ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 97709a8b8 [ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)
97709a8b8 is described below

commit 97709a8b8e38ebe13984dde59ef0fbb6b4a0f35b
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:52:18 2022 +0800

    [ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)
    
    * Broadcast example add the default NamesrvAddr
    
    * Broadcast example add the default NamesrvAddr
    
    * update annotation
---
 .../rocketmq/example/broadcast/PushConsumer.java   | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
index 28e02341c..e2a54137b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -16,35 +16,37 @@
  */
 package org.apache.rocketmq.example.broadcast;
 
-import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
 public class PushConsumer {
 
+    public static final String CONSUMER_GROUP = "please_rename_unique_group_name_1";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+
+    public static final String SUB_EXPRESSION = "TagA || TagC || TagD";
+
     public static void main(String[] args) throws InterruptedException, MQClientException {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
+
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
 
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
         consumer.setMessageModel(MessageModel.BROADCASTING);
 
-        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
-
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
+        consumer.subscribe(TOPIC, SUB_EXPRESSION);
 
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                ConsumeConcurrentlyContext context) {
-                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
 
         consumer.start();