You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/06 07:00:32 UTC

[rocketmq] branch develop updated: [ISSUE #5896] feat:add pop consumer example (#5991)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7a05483c9 [ISSUE #5896] feat:add pop consumer example (#5991)
7a05483c9 is described below

commit 7a05483c9f4772ba84bdeb1934b94e84229b7875
Author: mahaitao <15...@163.com>
AuthorDate: Mon Feb 6 15:00:20 2023 +0800

    [ISSUE #5896] feat:add pop consumer example (#5991)
    
    * feat:add pop consumer
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * Update example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
    
    Co-authored-by: Oliver <wq...@163.com>
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    ---------
    
    Co-authored-by: mahaitao617 <ma...@mahaitao617deMacBook-Pro.local>
    Co-authored-by: Oliver <wq...@163.com>
---
 example/pom.xml                                    |  8 +++
 .../rocketmq/example/simple/PopConsumer.java       | 62 ++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/example/pom.xml b/example/pom.xml
index ccbfdb666..07cf59082 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -36,6 +36,14 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-srvutil</artifactId>
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
new file mode 100644
index 000000000..6321e36d9
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class PopConsumer {
+    public static final String TOPIC = "TopicTest";
+    public static final String CONSUMER_GROUP = "CID_JODIE_1";
+    public static void main(String[] args) throws Exception {
+        switchPop();
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+        consumer.subscribe(TOPIC, "*");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+    private static void switchPop() throws Exception {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.start();
+        List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(TOPIC).getBrokerDatas();
+        for (BrokerData brokerData : brokerDatas) {
+            Set<String> brokerAddrs = new HashSet<>(brokerData.getBrokerAddrs().values());
+            for (String brokerAddr : brokerAddrs) {
+                mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
+            }
+        }
+    }
+}