You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/06/08 03:05:27 UTC

[1/2] incubator-rocketmq-site git commit: Update example of scheduled message

Repository: incubator-rocketmq-site
Updated Branches:
  refs/heads/master a84f68daa -> 10f8abe52


Update example of scheduled message


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/3bb95024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/3bb95024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/3bb95024

Branch: refs/heads/master
Commit: 3bb950241e1e2d07aed7f7ed788d968a3dcece97
Parents: a84f68d
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 7 11:57:11 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 7 11:57:11 2017 +0800

----------------------------------------------------------------------
 _docs/17-rmq-schedule-example.md | 105 +++++++++++++++++++---------------
 1 file changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/3bb95024/_docs/17-rmq-schedule-example.md
----------------------------------------------------------------------
diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md
index e9610ee..0340f21 100644
--- a/_docs/17-rmq-schedule-example.md
+++ b/_docs/17-rmq-schedule-example.md
@@ -11,61 +11,72 @@ modified: 2017-04-24T15:01:43-04:00
 
 
 ### What is scheduled message?
-Scheduled messages differ from normal messages such that they won't be delivered until a provided time later.
-If you use `DefaultMQPullConsumer` to consume message, you have to fetch message manually. There are other options availible but  `MQPullConsumerScheduleService` is the easiest.
 
-#### DefaultMQPullConsumer use case
+Scheduled messages differ from normal messages in that they won't be delivered until a provided time later.
 
-> First fetch subscribed queues of a topic
+### Application
+
+1. Start consumer to wait for incoming subscribed messages
 
 ```java
-Set<MessageQueue> testTopic = consumer.fetchSubscribeMessageQueues("testTopic");
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate message consumer
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+        // Subscribe topics
+        consumer.subscribe("TestTopic", "*");
+        // Register message listener
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
+                for (MessageExt message : messages) {
+                    // Print approximate delay time period
+                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // Launch consumer
+        consumer.start();
+    }
+}
 ```
 
-> Second chose a queue to fetch message,and save queue offset manually.
 
-#### Use MQPullConsumerScheduleService consume message
+2. Send scheduled messages
 
 ```java
-final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
-
-scheduleService.setMessageModel(MessageModel.CLUSTERING);
-scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
-
-    @Override
-    public void doPullTask(MessageQueue mq, PullTaskContext context) {
-        MQPullConsumer consumer = context.getPullConsumer();
-        try {
-
-            long offset = consumer.fetchConsumeOffset(mq, false);
-            if (offset < 0)
-                offset = 0;
-
-            PullResult pullResult = consumer.pull(mq, "*", offset, 32);
-            System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
-            switch (pullResult.getPullStatus()) {
-                case FOUND:
-                    break;
-                case NO_MATCHED_MSG:
-                    break;
-                case NO_NEW_MSG:
-                case OFFSET_ILLEGAL:
-                    break;
-                default:
-                    break;
-            }
-            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
-
-			//consume message auto
-            context.setPullNextDelayTimeMillis(100);
-        } catch (Exception e) {
-            e.printStackTrace();
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+
+    public static void main(String[] args) throws Exception {
+        // Instantiate a producer to send scheduled messages
+        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+        // Launch producer
+        producer.start();
+        int totalMessagesToSend = 100;
+        for (int i = 0; i < totalMessagesToSend; i++) {
+            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+            // This message will be delivered to consumer 10 seconds later.
+            message.setDelayTimeLevel(3);
+            // Send the message
+            producer.send(message);
         }
-    }
-});
-
-scheduleService.start();
-```
-
-#### Have fun with `MQPullConsumerScheduleService`.
 
+        // Shutdown producer after use.
+        producer.shutdown();
+    }
+    
+}
+```
\ No newline at end of file


[2/2] incubator-rocketmq-site git commit: Add a verfication section

Posted by li...@apache.org.
Add a verfication section


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/10f8abe5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/10f8abe5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/10f8abe5

Branch: refs/heads/master
Commit: 10f8abe52ce5c1fe81281400ef8d2fc9b77b76b9
Parents: 3bb9502
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 7 12:41:13 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 7 12:41:13 2017 +0800

----------------------------------------------------------------------
 .gitignore                       | 1 +
 _docs/17-rmq-schedule-example.md | 6 +++++-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/10f8abe5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 928dad0..ed99deb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,4 @@ example/_site
 Gemfile.lock
 node_modules
 npm-debug.log*
+*.iml

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/10f8abe5/_docs/17-rmq-schedule-example.md
----------------------------------------------------------------------
diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md
index 0340f21..1fa1d64 100644
--- a/_docs/17-rmq-schedule-example.md
+++ b/_docs/17-rmq-schedule-example.md
@@ -79,4 +79,8 @@ public class ScheduledMessageProducer {
     }
     
 }
-```
\ No newline at end of file
+```
+
+3. Verification 
+
+You should see messages are consumed about 10 seconds later than their storing time. 
\ No newline at end of file