You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/01 05:46:40 UTC
[rocketmq] branch develop updated: [ISSUE #4323] Schedule example add the default NamesrvAddr (#4352)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new febb21092 [ISSUE #4323] Schedule example add the default NamesrvAddr (#4352)
febb21092 is described below
commit febb21092bf54069ac8a5b68b3f99318c502da32
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Wed Jun 1 13:46:30 2022 +0800
[ISSUE #4323] Schedule example add the default NamesrvAddr (#4352)
* Schedule example add the default NamesrvAddr
* Schedule example add the default NamesrvAddr
* update annotation
---
.../example/schedule/ScheduledMessageConsumer.java | 32 ++++++++++++----------
.../example/schedule/ScheduledMessageProducer.java | 21 ++++++++++----
2 files changed, 33 insertions(+), 20 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
index fdb4c86eb..b3fab65f0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
@@ -17,31 +17,33 @@
package org.apache.rocketmq.example.schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
public class ScheduledMessageConsumer {
-
+
+ public static final String CONSUMER_GROUP = "ExampleConsumer";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TestTopic";
+
public static void main(String[] args) throws Exception {
// Instantiate message consumer
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+
// Subscribe topics
- consumer.subscribe("TestTopic", "*");
+ consumer.subscribe(TOPIC, "*");
// Register message listener
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
- for (MessageExt message : messages) {
- // Print approximate delay time period
- System.out.printf("Receive message[msgId=%s %d ms later]\n", message.getMsgId(),
- System.currentTimeMillis() - message.getStoreTimestamp());
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
+ for (MessageExt message : messages) {
+ // Print approximate delay time period
+ System.out.printf("Receive message[msgId=%s %d ms later]\n", message.getMsgId(),
+ System.currentTimeMillis() - message.getStoreTimestamp());
}
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// Launch consumer
consumer.start();
diff --git a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
index 0f6b722d5..994c81e64 100644
--- a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
@@ -17,25 +17,36 @@
package org.apache.rocketmq.example.schedule;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
+
+ public static final String PRODUCER_GROUP = "ExampleProducerGroup";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TestTopic";
+
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
- DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+ DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
- Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+ Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
- producer.send(message);
+ SendResult result = producer.send(message);
+ System.out.print(result);
}
-
+
// Shutdown producer after use.
producer.shutdown();
}
-
+
}
\ No newline at end of file