You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/06/08 03:05:27 UTC
[1/2] incubator-rocketmq-site git commit: Update example of scheduled
message
Repository: incubator-rocketmq-site
Updated Branches:
refs/heads/master a84f68daa -> 10f8abe52
Update example of scheduled message
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/3bb95024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/3bb95024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/3bb95024
Branch: refs/heads/master
Commit: 3bb950241e1e2d07aed7f7ed788d968a3dcece97
Parents: a84f68d
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 7 11:57:11 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 7 11:57:11 2017 +0800
----------------------------------------------------------------------
_docs/17-rmq-schedule-example.md | 105 +++++++++++++++++++---------------
1 file changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/3bb95024/_docs/17-rmq-schedule-example.md
----------------------------------------------------------------------
diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md
index e9610ee..0340f21 100644
--- a/_docs/17-rmq-schedule-example.md
+++ b/_docs/17-rmq-schedule-example.md
@@ -11,61 +11,72 @@ modified: 2017-04-24T15:01:43-04:00
### What is scheduled message?
-Scheduled messages differ from normal messages such that they won't be delivered until a provided time later.
-If you use `DefaultMQPullConsumer` to consume message, you have to fetch message manually. There are other options availible but `MQPullConsumerScheduleService` is the easiest.
-#### DefaultMQPullConsumer use case
+Scheduled messages differ from normal messages in that they won't be delivered until a provided time later.
-> First fetch subscribed queues of a topic
+### Application
+
+1. Start consumer to wait for incoming subscribed messages
```java
-Set<MessageQueue> testTopic = consumer.fetchSubscribeMessageQueues("testTopic");
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+
+ public static void main(String[] args) throws Exception {
+ // Instantiate message consumer
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+ // Subscribe topics
+ consumer.subscribe("TestTopic", "*");
+ // Register message listener
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
+ for (MessageExt message : messages) {
+ // Print approximate delay time period
+ System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ // Launch consumer
+ consumer.start();
+ }
+}
```
-> Second chose a queue to fetch message,and save queue offset manually.
-#### Use MQPullConsumerScheduleService consume message
+2. Send scheduled messages
```java
-final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
-
-scheduleService.setMessageModel(MessageModel.CLUSTERING);
-scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
-
- @Override
- public void doPullTask(MessageQueue mq, PullTaskContext context) {
- MQPullConsumer consumer = context.getPullConsumer();
- try {
-
- long offset = consumer.fetchConsumeOffset(mq, false);
- if (offset < 0)
- offset = 0;
-
- PullResult pullResult = consumer.pull(mq, "*", offset, 32);
- System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
-
- //consume message auto
- context.setPullNextDelayTimeMillis(100);
- } catch (Exception e) {
- e.printStackTrace();
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+
+ public static void main(String[] args) throws Exception {
+ // Instantiate a producer to send scheduled messages
+ DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+ // Launch producer
+ producer.start();
+ int totalMessagesToSend = 100;
+ for (int i = 0; i < totalMessagesToSend; i++) {
+ Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+ // This message will be delivered to consumer 10 seconds later.
+ message.setDelayTimeLevel(3);
+ // Send the message
+ producer.send(message);
}
- }
-});
-
-scheduleService.start();
-```
-
-#### Have fun with `MQPullConsumerScheduleService`.
+ // Shutdown producer after use.
+ producer.shutdown();
+ }
+
+}
+```
\ No newline at end of file
[2/2] incubator-rocketmq-site git commit: Add a verfication section
Posted by li...@apache.org.
Add a verfication section
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/10f8abe5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/10f8abe5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/10f8abe5
Branch: refs/heads/master
Commit: 10f8abe52ce5c1fe81281400ef8d2fc9b77b76b9
Parents: 3bb9502
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 7 12:41:13 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 7 12:41:13 2017 +0800
----------------------------------------------------------------------
.gitignore | 1 +
_docs/17-rmq-schedule-example.md | 6 +++++-
2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/10f8abe5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 928dad0..ed99deb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,4 @@ example/_site
Gemfile.lock
node_modules
npm-debug.log*
+*.iml
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/10f8abe5/_docs/17-rmq-schedule-example.md
----------------------------------------------------------------------
diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md
index 0340f21..1fa1d64 100644
--- a/_docs/17-rmq-schedule-example.md
+++ b/_docs/17-rmq-schedule-example.md
@@ -79,4 +79,8 @@ public class ScheduledMessageProducer {
}
}
-```
\ No newline at end of file
+```
+
+3. Verification
+
+You should see messages are consumed about 10 seconds later than their storing time.
\ No newline at end of file