You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/18 11:37:24 UTC

[rocketmq] branch develop updated: [RIP-9] Add a delay example in RocketMQ

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new deaece5  [RIP-9] Add a delay example in RocketMQ
deaece5 is described below

commit deaece56941b1df36605a924ebc3e9627dc35f5c
Author: ThailandKing <38...@users.noreply.github.com>
AuthorDate: Mon Feb 18 19:37:19 2019 +0800

    [RIP-9] Add a delay example in RocketMQ
    
    [RIP-9] Add a delay example in RocketMQ
---
 docs/en/Example_Delay.md | 85 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 85 insertions(+)

diff --git a/docs/en/Example_Delay.md b/docs/en/Example_Delay.md
new file mode 100644
index 0000000..d59c2c5
--- /dev/null
+++ b/docs/en/Example_Delay.md
@@ -0,0 +1,85 @@
+# Schedule example
+
+### 1、Start consumer to wait for incoming subscribed messages 
+
+```java
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate message consumer
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+        // Subscribe topics
+        consumer.subscribe("TestTopic", "*");
+        // Register message listener
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
+                for (MessageExt message : messages) {
+                    // Print approximate delay time period
+                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+                                       + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // Launch consumer
+        consumer.start();
+    }
+}
+```
+
+### 2、Send scheduled messages 
+
+```java
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate a producer to send scheduled messages
+        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+        // Launch producer
+        producer.start();
+        int totalMessagesToSend = 100;
+        for (int i = 0; i < totalMessagesToSend; i++) {
+            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+            // This message will be delivered to consumer 10 seconds later.
+            message.setDelayTimeLevel(3);
+            // Send the message
+            producer.send(message);
+        }
+
+        // Shutdown producer after use.
+        producer.shutdown();
+    }
+
+}
+```
+
+### 3、Verification 
+
+You should see messages are consumed about 10 seconds later than their storing time. 
+
+### 4、Use scenarios for scheduled messages
+
+For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.
+
+### 5、Restrictions on the use of scheduled messages
+
+```java 
+// org/apache/rocketmq/store/config/MessageStoreConfig.java
+
+private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
+```
+
+Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.
+
+ See `SendMessageProcessor.java` 
\ No newline at end of file