You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/21 07:58:29 UTC

[rocketmq] branch master updated: [RIP-9] Commit docs Example_OpenMessaging.md (#797)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f37ff2  [RIP-9] Commit docs Example_OpenMessaging.md (#797)
8f37ff2 is described below

commit 8f37ff27db02177a1e336c8d5ee1a089a4a23939
Author: xiongwu1 <11...@qq.com>
AuthorDate: Thu Feb 21 15:58:25 2019 +0800

    [RIP-9] Commit docs Example_OpenMessaging.md (#797)
    
    [RIP-9] Commit docs Example_OpenMessaging.md
---
 docs/en/Example_OpenMessaging.md | 118 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git a/docs/en/Example_OpenMessaging.md b/docs/en/Example_OpenMessaging.md
new file mode 100644
index 0000000..026e76e
--- /dev/null
+++ b/docs/en/Example_OpenMessaging.md
@@ -0,0 +1,118 @@
+# OpenMessaging Example
+[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major plat [...]
+
+RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.
+
+## OMSProducer
+The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.
+
+```
+public class OMSProducer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final Producer producer = messagingAccessPoint.createProducer();
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+
+        producer.startup();
+        System.out.printf("Producer startup OK%n");
+
+        {
+            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            SendResult sendResult = producer.send(message);
+            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
+        }
+
+        {
+            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            result.addListener(new PromiseListener<SendResult>() {
+                @Override
+                public void operationCompleted(Promise<SendResult> promise) {
+                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
+                }
+
+                @Override
+                public void operationFailed(Promise<SendResult> promise) {
+                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
+                }
+            });
+        }
+
+        {
+            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            System.out.printf("Send oneway message OK%n");
+        }
+
+        producer.shutdown();
+        messagingAccessPoint.shutdown();
+    }
+}
+```
+## OMSPullConsumer
+Use OMS PullConsumer to poll messages from a specified queue.
+
+```
+public class OMSPullConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+        
+        consumer.startup();
+        System.out.printf("Consumer startup OK%n");
+
+        Message message = consumer.poll();
+        if (message != null) {
+            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+            System.out.printf("Received one message: %s%n", msgId);
+            consumer.ack(msgId);
+        }
+
+        consumer.shutdown();
+        messagingAccessPoint.shutdown();
+    }
+}
+
+```
+## OMSPushConsumer
+Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener
+
+```
+public class OMSPushConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final PushConsumer consumer = messagingAccessPoint.
+            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+        
+        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context) {
+                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
+                context.ack();
+            }
+        });
+        
+    }
+}
+```