You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/10 02:13:38 UTC
[rocketmq-site] branch new-official-website updated: to#550 fix async send message example (#551)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git
The following commit(s) were added to refs/heads/new-official-website by this push:
new 3cd496de5 to#550 fix async send message example (#551)
3cd496de5 is described below
commit 3cd496de5d43ab2ab807e2ec079d7ed130cee7d8
Author: Zhongliang.Chen <ch...@gmail.com>
AuthorDate: Wed May 10 10:13:33 2023 +0800
to#550 fix async send message example (#551)
---
docs/02-producer/02message1.md | 47 +++++++++++++---------
.../current/02-producer/02message1.md | 47 +++++++++++++---------
2 files changed, 58 insertions(+), 36 deletions(-)
diff --git a/docs/02-producer/02message1.md b/docs/02-producer/02message1.md
index 2882d0392..a8ec08fc0 100644
--- a/docs/02-producer/02message1.md
+++ b/docs/02-producer/02message1.md
@@ -114,26 +114,37 @@ public class AsyncProducer {
// 启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
- for (int i = 0; i < 100; i++) {
- final int index = i;
- // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
- Message msg = new Message("TopicTest",
- "TagA",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- // 异步发送消息, 发送结果通过callback返回给客户端
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.printf("%-10d OK %s %n", index,
- sendResult.getMsgId());
- }
- @Override
- public void onException(Throwable e) {
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
+ int messageCount = 100;
+ final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ final int index = i;
+ // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // 异步发送消息, 发送结果通过callback返回给客户端
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ countDownLatch.countDown();
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ countDownLatch.countDown();
}
- });
}
+ //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
+ countDownLatch.await(5, TimeUnit.SECONDS);
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
diff --git a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
index eddb62713..e09baa592 100644
--- a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
+++ b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
@@ -109,26 +109,37 @@ public class AsyncProducer {
// Start Producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
- for (int i = 0; i < 100; i++) {
- final int index = i;
- // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
- Message msg = new Message("TopicTest",
- "TagA",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- // Send a message asynchronously, the result is returned to the client by callback
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.printf("%-10d OK %s %n", index,
- sendResult.getMsgId());
- }
- @Override
- public void onException(Throwable e) {
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
+ int messageCount = 100;
+ final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ final int index = i;
+ // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // Send a message asynchronously, the result is returned to the client by callback
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ countDownLatch.countDown();
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ countDownLatch.countDown();
}
- });
}
+ //If reliable transmission is required for asynchronous sending, the logic must not be terminated until a clear result is returned from the callback interface. Otherwise, closing the Producer immediately may result in some messages not being successfully transmitted.
+ countDownLatch.await(5, TimeUnit.SECONDS);
// Close the producer once it is no longer in use
producer.shutdown();
}