You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/01 02:19:05 UTC

[rocketmq-site] branch new-official-website updated: Add normal message send chinese document for new official website (#144)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git


The following commit(s) were added to refs/heads/new-official-website by this push:
     new 488b9342 Add normal message send chinese document for new official website (#144)
488b9342 is described below

commit 488b9342820d1755206d03cb7c0652fde51c43c2
Author: rongtong <ji...@163.com>
AuthorDate: Fri Jul 1 10:19:00 2022 +0800

    Add normal message send chinese document for new official website (#144)
---
 docs/05message1.md                                 | 141 +++++++++++++++++++++
 "docs/picture/Oneway\345\217\221\351\200\201.png"  | Bin 0 -> 46451 bytes
 ...20\214\346\255\245\345\217\221\351\200\201.png" | Bin 0 -> 66064 bytes
 ...74\202\346\255\245\345\217\221\351\200\201.png" | Bin 0 -> 70219 bytes
 4 files changed, 141 insertions(+)

diff --git a/docs/05message1.md b/docs/05message1.md
index a6e57a6a..d14ec235 100644
--- a/docs/05message1.md
+++ b/docs/05message1.md
@@ -1 +1,142 @@
 # 普通消息发送
+
+**1.向集群中创建Topic**
+
+RocketMQ集群是默认开启了autoCreateTopicEnable配置,会自动为发送的消息创建Topic,如果autoCreateTopicEnable没有开启,也可以利用RocketMQ Admin工具创建目标Topic。
+
+```shell
+> sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -n 127.0.0.1:9876
+create topic to 127.0.0.1:10911 success.
+TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes=null]
+```
+
+可以看到在执行完命令后,在该台Broker机器上创建了8个队列,名为TopicTest的Topic。
+
+**2.添加客户端依赖**
+
+首先需要在JAVA程序中添加RocketMQ的客户端依赖。
+
+maven:
+``` java
+<dependency>
+  <groupId>org.apache.rocketmq</groupId>
+  <artifactId>rocketmq-client</artifactId>
+  <version>4.9.4</version>
+</dependency>
+```
+gradle:
+``` java 
+compile 'org.apache.rocketmq:rocketmq-client:4.9.4'
+```
+
+**3.消息发送**
+
+Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
+
+(1) 同步发送
+
+![同步发送](picture/同步发送.png)
+
+首先是使用Producer发送同步消息,同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
+``` java
+public class SyncProducer {
+  public static void main(String[] args) throws Exception {
+    // 初始化一个producer并设置Producer group name
+    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
+    // 设置NameServer地址
+    producer.setNamesrvAddr("localhost:9876");  //(2)
+    // 启动producer
+    producer.start();
+    for (int i = 0; i < 100; i++) {
+      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+      Message msg = new Message("TopicTest" /* Topic */,
+        "TagA" /* Tag */,
+        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+        );   //(3)
+      // 利用producer进行发送,并同步等待发送结果
+      SendResult sendResult = producer.send(msg);   //(4)
+      System.out.printf("%s%n", sendResult);
+    }
+    // 一旦producer不再使用,关闭producer
+    producer.shutdown();
+  }
+}
+```
+
+同步发送的整个代码流程如下:(1)首先会创建一个producer,普通消息可以创建DefaultMQProducer,创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。(2)然后设置NameServer的地址,Apache RocketMQ很多方式设置NameServer地址(客户端配置中有介绍),这里是在代码中调用producer的API setNamesrvAddr进行设置,如果有多个NameServer,中间以分号隔开,比如"127.0.0.2:9876;127.0.0.3:9876"。(3) 第三步是构建消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤。(4)最后调用send接口将消息发送出去,同步发送等待结果最后返回SendResult,SendResut包含实际发送状态还包括SEND_OK(发送成功), FLUSH_DISK_TIMEOUT(刷盘超时), FLUSH_SLAVE_TIMEOUT(同步到备超时 [...]
+
+(2)异步发送
+
+![异步发送](picture/异步发送.png)
+
+异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
+
+``` java
+public class AsyncProducer {
+  public static void main(String[] args) throws Exception {
+    // 初始化一个producer并设置Producer group name
+    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+    // 设置NameServer地址
+    producer.setNamesrvAddr("localhost:9876");
+    // 启动producer
+    producer.start();
+    producer.setRetryTimesWhenSendAsyncFailed(0);
+    for (int i = 0; i < 100; i++) {
+      final int index = i;
+      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+      Message msg = new Message("TopicTest",
+        "TagA",
+        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+      // 异步发送消息, 发送结果通过callback返回给客户端
+      producer.send(msg, new SendCallback() {
+        @Override
+        public void onSuccess(SendResult sendResult) {
+          System.out.printf("%-10d OK %s %n", index,
+            sendResult.getMsgId());
+        }
+        @Override
+        public void onException(Throwable e) {
+          System.out.printf("%-10d Exception %s %n", index, e);
+          e.printStackTrace();
+        }
+      });
+    }
+    // 一旦producer不再使用,关闭producer
+    producer.shutdown();
+  }
+}
+```
+
+异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入SendCallback的实现,SendCallback接口主要有onSuccess和onException两个方法,表示消息发送成功和消息发送失败。
+
+(3) 单向模式发送
+
+![Oneway发送](picture/Oneway发送.png)
+
+发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
+
+``` java
+public class OnewayProducer {
+  public static void main(String[] args) throws Exception{
+    // 初始化一个producer并设置Producer group name
+    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+    // 设置NameServer地址
+    producer.setNamesrvAddr("localhost:9876");
+    // 启动producer
+    producer.start();
+    for (int i = 0; i < 100; i++) {
+      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+      Message msg = new Message("TopicTest" /* Topic */,
+        "TagA" /* Tag */,
+        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+      );
+      // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
+      producer.sendOneway(msg);
+    }
+     // 一旦producer不再使用,关闭producer
+     producer.shutdown();
+  }
+}
+```
+
+单向模式调用sendOneway,不会对返回结果有任何等待和处理。
diff --git "a/docs/picture/Oneway\345\217\221\351\200\201.png" "b/docs/picture/Oneway\345\217\221\351\200\201.png"
new file mode 100644
index 00000000..05ad45e4
Binary files /dev/null and "b/docs/picture/Oneway\345\217\221\351\200\201.png" differ
diff --git "a/docs/picture/\345\220\214\346\255\245\345\217\221\351\200\201.png" "b/docs/picture/\345\220\214\346\255\245\345\217\221\351\200\201.png"
new file mode 100644
index 00000000..373bef31
Binary files /dev/null and "b/docs/picture/\345\220\214\346\255\245\345\217\221\351\200\201.png" differ
diff --git "a/docs/picture/\345\274\202\346\255\245\345\217\221\351\200\201.png" "b/docs/picture/\345\274\202\346\255\245\345\217\221\351\200\201.png"
new file mode 100644
index 00000000..52827994
Binary files /dev/null and "b/docs/picture/\345\274\202\346\255\245\345\217\221\351\200\201.png" differ