You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/21 07:40:16 UTC
[rocketmq] branch develop updated: [RIP-9]To add English version of
Examples of Ordered Messages in docs/en/ (#795)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5dc64d0 [RIP-9]To add English version of Examples of Ordered Messages in docs/en/ (#795)
5dc64d0 is described below
commit 5dc64d03e0886dc0a0ab2ddfcd5fc75c602c0a3f
Author: eagle101113 <ya...@petrochina.com.cn>
AuthorDate: Thu Feb 21 15:40:11 2019 +0800
[RIP-9]To add English version of Examples of Ordered Messages in docs/en/ (#795)
* To add the English version of Examples of Ordered Messages in docs/en/
* Format the style using MD gramma.
---
docs/en/Example_Orderly.md | 232 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 232 insertions(+)
diff --git a/docs/en/Example_Orderly.md b/docs/en/Example_Orderly.md
new file mode 100644
index 0000000..dbb02cf
--- /dev/null
+++ b/docs/en/Example_Orderly.md
@@ -0,0 +1,232 @@
+# 2 Example for ordered messages
+
+RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.
+
+The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.
+
+## 2.1 produce ordered messages
+```
+package org.apache.rocketmq.example.order2
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+* ordered messages producer
+*/
+public class Producer {
+
+ public static void main(String[] args) throws Exception {
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.start();
+ String[] tags = new String[]{"TagA", "TagC", "TagD"};
+ // sales orders list
+ List<OrderStep> orderList = new Producer().buildOrders();
+
+ Date date = new Date();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String dateStr = sdf.format(date);
+
+ for (int i = 0; i < 10; i++) {
+ // generate message timestamp
+ String body = dateStr + " Hello RocketMQ " + orderList.get(i);
+ Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
+
+ SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ Long id = (Long) arg; //message queue is selected by #salesOrderID
+ long index = id % mqs.size();
+ return mqs.get((int) index);
+ }
+ }, orderList.get(i).getOrderId());
+
+ System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
+ sendResult.getSendStatus(),
+ sendResult.getMessageQueue().getQueueId(),
+ body));
+ }
+
+ producer.shutdown();
+ }
+
+ /**
+ * each sales order step
+ */
+ private static class OrderStep {
+ private long orderId;
+ private String desc;
+
+ public long getOrderId() {
+ return orderId;
+ }
+
+ public void setOrderId(long orderId) {
+ this.orderId = orderId;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+
+ @Override
+ public String toString() {
+ return "OrderStep{" +
+ "orderId=" + orderId +
+ ", desc='" + desc + '\'' +
+ '}';
+ }
+ }
+
+ /**
+ * to generate ten OrderStep objects for three sales orders:
+ * #SalesOrder "15103111039L": create, pay, send, finish;
+ * #SalesOrder "15103111065L": create, pay, finish;
+ * #SalesOrder "15103117235L": create, pay, finish;
+ */
+ private List<OrderStep> buildOrders() {
+
+ List<OrderStep> orderList = new ArrayList<OrderStep>();
+
+ //create sales order with orderid="15103111039L"
+ OrderStep orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("create");
+ orderList.add(orderDemo);
+
+ //create sales order with orderid="15103111065L"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("create");
+ orderList.add(orderDemo);
+
+ //pay sales order #"15103111039L"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("pay");
+ orderList.add(orderDemo);
+
+ //create sales order with orderid="15103117235L"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("create");
+ orderList.add(orderDemo);
+
+ //pay sales order #"15103111065L"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("pay");
+ orderList.add(orderDemo);
+
+ //pay sales order #"15103117235L"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("pay");
+ orderList.add(orderDemo);
+
+ //mark sales order #"15103111065L" as "finish"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("finish");
+ orderList.add(orderDemo);
+
+ //mark mark sales order #"15103111039L" as "send"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("send");
+ orderList.add(orderDemo);
+
+ ////mark sales order #"15103117235L" as "finish"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("finish");
+ orderList.add(orderDemo);
+
+ //mark sales order #"15103111039L" as "finish"
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("finish");
+ orderList.add(orderDemo);
+
+ return orderList;
+ }
+}
+
+```
+
+## 2.2 Consume ordered messages
+
+```
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * consume messages in order
+ */
+public class ConsumerInOrder {
+
+ public static void main(String[] args) throws Exception {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ /**
+ * when the consumer is first run, the start point of message queue where it can get messages will be set.
+ * or if it is restarted, it will continue from the last place to get messages.
+ */
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+
+ Random random = new Random();
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ context.setAutoCommit(true);
+ for (MessageExt msg : msgs) {
+ // one consumer for each message queue, and messages order are kept in a single message queue.
+ System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(random.nextInt(10));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ });
+
+ consumer.start();
+
+ System.out.println("Consumer Started.");
+ }
+}
+
+```
+
+