You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/21 07:58:29 UTC
[rocketmq] branch master updated: [RIP-9] Commit docs
Example_OpenMessaging.md (#797)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/master by this push:
new 8f37ff2 [RIP-9] Commit docs Example_OpenMessaging.md (#797)
8f37ff2 is described below
commit 8f37ff27db02177a1e336c8d5ee1a089a4a23939
Author: xiongwu1 <11...@qq.com>
AuthorDate: Thu Feb 21 15:58:25 2019 +0800
[RIP-9] Commit docs Example_OpenMessaging.md (#797)
[RIP-9] Commit docs Example_OpenMessaging.md
---
docs/en/Example_OpenMessaging.md | 118 +++++++++++++++++++++++++++++++++++++++
1 file changed, 118 insertions(+)
diff --git a/docs/en/Example_OpenMessaging.md b/docs/en/Example_OpenMessaging.md
new file mode 100644
index 0000000..026e76e
--- /dev/null
+++ b/docs/en/Example_OpenMessaging.md
@@ -0,0 +1,118 @@
+# OpenMessaging Example
+[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major plat [...]
+
+RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.
+
+## OMSProducer
+The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.
+
+```
+public class OMSProducer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ producer.startup();
+ System.out.printf("Producer startup OK%n");
+
+ {
+ Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ SendResult sendResult = producer.send(message);
+ System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
+ }
+
+ {
+ final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ result.addListener(new PromiseListener<SendResult>() {
+ @Override
+ public void operationCompleted(Promise<SendResult> promise) {
+ System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
+ }
+
+ @Override
+ public void operationFailed(Promise<SendResult> promise) {
+ System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
+ }
+ });
+ }
+
+ {
+ producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ System.out.printf("Send oneway message OK%n");
+ }
+
+ producer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+}
+```
+## OMSPullConsumer
+Use OMS PullConsumer to poll messages from a specified queue.
+
+```
+public class OMSPullConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ consumer.startup();
+ System.out.printf("Consumer startup OK%n");
+
+ Message message = consumer.poll();
+ if (message != null) {
+ String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+ System.out.printf("Received one message: %s%n", msgId);
+ consumer.ack(msgId);
+ }
+
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+}
+
+```
+## OMSPushConsumer
+Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener
+
+```
+public class OMSPushConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PushConsumer consumer = messagingAccessPoint.
+ createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+ @Override
+ public void onMessage(final Message message, final ReceivedMessageContext context) {
+ System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
+ context.ack();
+ }
+ });
+
+ }
+}
+```