You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/21 07:40:16 UTC

[rocketmq] branch develop updated: [RIP-9]To add English version of Examples of Ordered Messages in docs/en/ (#795)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5dc64d0  [RIP-9]To add English version of Examples of Ordered Messages in docs/en/ (#795)
5dc64d0 is described below

commit 5dc64d03e0886dc0a0ab2ddfcd5fc75c602c0a3f
Author: eagle101113 <ya...@petrochina.com.cn>
AuthorDate: Thu Feb 21 15:40:11 2019 +0800

    [RIP-9]To add English version of Examples of Ordered Messages in docs/en/ (#795)
    
    * To add the English version of Examples of Ordered Messages in docs/en/
    
    * Format the style using MD gramma.
---
 docs/en/Example_Orderly.md | 232 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 232 insertions(+)

diff --git a/docs/en/Example_Orderly.md b/docs/en/Example_Orderly.md
new file mode 100644
index 0000000..dbb02cf
--- /dev/null
+++ b/docs/en/Example_Orderly.md
@@ -0,0 +1,232 @@
+# 2 Example for ordered messages
+
+RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.
+
+The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.
+
+## 2.1 produce ordered messages
+```
+package org.apache.rocketmq.example.order2
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+* ordered messages producer
+*/
+public class Producer {
+
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.start();
+        String[] tags = new String[]{"TagA", "TagC", "TagD"};
+        // sales orders list
+        List<OrderStep> orderList = new Producer().buildOrders();
+
+        Date date = new Date();
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateStr = sdf.format(date);
+
+        for (int i = 0; i < 10; i++) {
+            // generate message timestamp
+            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
+            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
+
+            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
+                @Override
+                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                    Long id = (Long) arg;  //message queue is selected by #salesOrderID
+                    long index = id % mqs.size();
+                    return mqs.get((int) index);
+                }
+            }, orderList.get(i).getOrderId());
+
+            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
+                    sendResult.getSendStatus(),
+                    sendResult.getMessageQueue().getQueueId(),
+                    body));
+        }
+
+        producer.shutdown();
+    }
+
+    /**
+     * each sales order step
+     */
+    private static class OrderStep {
+        private long orderId;
+        private String desc;
+
+        public long getOrderId() {
+            return orderId;
+        }
+
+        public void setOrderId(long orderId) {
+            this.orderId = orderId;
+        }
+
+        public String getDesc() {
+            return desc;
+        }
+
+        public void setDesc(String desc) {
+            this.desc = desc;
+        }
+
+        @Override
+        public String toString() {
+            return "OrderStep{" +
+                    "orderId=" + orderId +
+                    ", desc='" + desc + '\'' +
+                    '}';
+        }
+    }
+
+    /**
+     * to generate ten OrderStep objects for three sales orders:
+     *  #SalesOrder "15103111039L": create, pay, send, finish;
+     *  #SalesOrder "15103111065L": create, pay, finish;
+     *  #SalesOrder "15103117235L": create, pay, finish;
+     */    
+	private List<OrderStep> buildOrders() {
+		
+        List<OrderStep> orderList = new ArrayList<OrderStep>();
+
+        //create sales order with orderid="15103111039L"
+        OrderStep orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111039L);
+        orderDemo.setDesc("create");
+        orderList.add(orderDemo);
+
+        //create sales order with orderid="15103111065L"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111065L);
+        orderDemo.setDesc("create");
+        orderList.add(orderDemo);
+
+        //pay sales order #"15103111039L"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111039L);
+        orderDemo.setDesc("pay");
+        orderList.add(orderDemo);
+
+        //create sales order with orderid="15103117235L"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103117235L);
+        orderDemo.setDesc("create");
+        orderList.add(orderDemo);
+
+        //pay sales order #"15103111065L"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111065L);
+        orderDemo.setDesc("pay");
+        orderList.add(orderDemo);
+
+        //pay sales order #"15103117235L"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103117235L);
+        orderDemo.setDesc("pay");
+        orderList.add(orderDemo);
+
+        //mark sales order #"15103111065L" as "finish"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111065L);
+        orderDemo.setDesc("finish");
+        orderList.add(orderDemo);
+
+        //mark mark sales order #"15103111039L" as "send"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111039L);
+        orderDemo.setDesc("send");
+        orderList.add(orderDemo);
+
+        ////mark sales order #"15103117235L" as "finish"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103117235L);
+        orderDemo.setDesc("finish");
+        orderList.add(orderDemo);
+
+        //mark sales order #"15103111039L" as "finish"
+        orderDemo = new OrderStep();
+        orderDemo.setOrderId(15103111039L);
+        orderDemo.setDesc("finish");
+        orderList.add(orderDemo);
+
+        return orderList;
+    }
+}
+
+```
+
+## 2.2 Consume ordered messages
+
+```
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * consume messages in order
+ */
+public class ConsumerInOrder {
+
+    public static void main(String[] args) throws Exception {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
+        consumer.setNamesrvAddr("127.0.0.1:9876");
+        /**
+         * when the consumer is first run, the start point of message queue where it can get messages will be set.
+         * or if it is restarted, it will continue from the last place to get messages.
+         */
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+        consumer.registerMessageListener(new MessageListenerOrderly() {
+
+            Random random = new Random();
+
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                context.setAutoCommit(true);
+                for (MessageExt msg : msgs) {
+                    // one consumer for each message queue, and messages order are kept in a single message queue.
+                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
+                }
+
+                try {
+                    TimeUnit.SECONDS.sleep(random.nextInt(10));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        });
+
+        consumer.start();
+
+        System.out.println("Consumer Started.");
+    }
+}
+
+```
+
+