You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:52:24 UTC
[rocketmq] branch develop updated: [ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 97709a8b8 [ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)
97709a8b8 is described below
commit 97709a8b8e38ebe13984dde59ef0fbb6b4a0f35b
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:52:18 2022 +0800
[ISSUE #4323] Broadcast example add the default NamesrvAddr (#4344)
* Broadcast example add the default NamesrvAddr
* Broadcast example add the default NamesrvAddr
* update annotation
---
.../rocketmq/example/broadcast/PushConsumer.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
index 28e02341c..e2a54137b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -16,35 +16,37 @@
*/
package org.apache.rocketmq.example.broadcast;
-import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class PushConsumer {
+ public static final String CONSUMER_GROUP = "please_rename_unique_group_name_1";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+
+ public static final String SUB_EXPRESSION = "TagA || TagC || TagD";
+
public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.subscribe("TopicTest", "TagA || TagC || TagD");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
+ consumer.subscribe(TOPIC, SUB_EXPRESSION);
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();