You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/06 07:00:32 UTC
[rocketmq] branch develop updated: [ISSUE #5896] feat:add pop consumer example (#5991)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7a05483c9 [ISSUE #5896] feat:add pop consumer example (#5991)
7a05483c9 is described below
commit 7a05483c9f4772ba84bdeb1934b94e84229b7875
Author: mahaitao <15...@163.com>
AuthorDate: Mon Feb 6 15:00:20 2023 +0800
[ISSUE #5896] feat:add pop consumer example (#5991)
* feat:add pop consumer
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* Update example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
Co-authored-by: Oliver <wq...@163.com>
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
* feat:fix
---------
Co-authored-by: mahaitao617 <ma...@mahaitao617deMacBook-Pro.local>
Co-authored-by: Oliver <wq...@163.com>
---
example/pom.xml | 8 +++
.../rocketmq/example/simple/PopConsumer.java | 62 ++++++++++++++++++++++
2 files changed, 70 insertions(+)
diff --git a/example/pom.xml b/example/pom.xml
index ccbfdb666..07cf59082 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -36,6 +36,14 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
new file mode 100644
index 000000000..6321e36d9
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class PopConsumer {
+ public static final String TOPIC = "TopicTest";
+ public static final String CONSUMER_GROUP = "CID_JODIE_1";
+ public static void main(String[] args) throws Exception {
+ switchPop();
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+ consumer.subscribe(TOPIC, "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+ private static void switchPop() throws Exception {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.start();
+ List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(TOPIC).getBrokerDatas();
+ for (BrokerData brokerData : brokerDatas) {
+ Set<String> brokerAddrs = new HashSet<>(brokerData.getBrokerAddrs().values());
+ for (String brokerAddr : brokerAddrs) {
+ mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
+ }
+ }
+ }
+}