You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/10 02:13:38 UTC

[rocketmq-site] branch new-official-website updated: to#550 fix async send message example (#551)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git


The following commit(s) were added to refs/heads/new-official-website by this push:
     new 3cd496de5 to#550 fix async send message example (#551)
3cd496de5 is described below

commit 3cd496de5d43ab2ab807e2ec079d7ed130cee7d8
Author: Zhongliang.Chen <ch...@gmail.com>
AuthorDate: Wed May 10 10:13:33 2023 +0800

    to#550 fix async send message example (#551)
---
 docs/02-producer/02message1.md                     | 47 +++++++++++++---------
 .../current/02-producer/02message1.md              | 47 +++++++++++++---------
 2 files changed, 58 insertions(+), 36 deletions(-)

diff --git a/docs/02-producer/02message1.md b/docs/02-producer/02message1.md
index 2882d0392..a8ec08fc0 100644
--- a/docs/02-producer/02message1.md
+++ b/docs/02-producer/02message1.md
@@ -114,26 +114,37 @@ public class AsyncProducer {
     // 启动producer
     producer.start();
     producer.setRetryTimesWhenSendAsyncFailed(0);
-    for (int i = 0; i < 100; i++) {
-      final int index = i;
-      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
-      Message msg = new Message("TopicTest",
-        "TagA",
-        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-      // 异步发送消息, 发送结果通过callback返回给客户端
-      producer.send(msg, new SendCallback() {
-        @Override
-        public void onSuccess(SendResult sendResult) {
-          System.out.printf("%-10d OK %s %n", index,
-            sendResult.getMsgId());
-        }
-        @Override
-        public void onException(Throwable e) {
-          System.out.printf("%-10d Exception %s %n", index, e);
-          e.printStackTrace();
+    int messageCount = 100;
+    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+    for (int i = 0; i < messageCount; i++) {
+      try {
+          final int index = i;
+          // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
+          Message msg = new Message("TopicTest",
+            "TagA",
+            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+          // 异步发送消息, 发送结果通过callback返回给客户端
+          producer.send(msg, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+              System.out.printf("%-10d OK %s %n", index,
+                sendResult.getMsgId());
+              countDownLatch.countDown();
+            }
+            @Override
+            public void onException(Throwable e) {
+              System.out.printf("%-10d Exception %s %n", index, e);
+              e.printStackTrace();
+              countDownLatch.countDown();
+            }
+          });
+        } catch (Exception e) {
+            e.printStackTrace();
+            countDownLatch.countDown();
         }
-      });
     }
+    //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
+    countDownLatch.await(5, TimeUnit.SECONDS);
     // 一旦producer不再使用,关闭producer
     producer.shutdown();
   }
diff --git a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
index eddb62713..e09baa592 100644
--- a/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
+++ b/i18n/en/docusaurus-plugin-content-docs/current/02-producer/02message1.md
@@ -109,26 +109,37 @@ public class AsyncProducer {
     // Start Producer
     producer.start();
     producer.setRetryTimesWhenSendAsyncFailed(0);
-    for (int i = 0; i < 100; i++) {
-      final int index = i;
-      // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
-      Message msg = new Message("TopicTest",
-        "TagA",
-        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-      // Send a message asynchronously, the result is returned to the client by callback
-      producer.send(msg, new SendCallback() {
-        @Override
-        public void onSuccess(SendResult sendResult) {
-          System.out.printf("%-10d OK %s %n", index,
-            sendResult.getMsgId());
-        }
-        @Override
-        public void onException(Throwable e) {
-          System.out.printf("%-10d Exception %s %n", index, e);
-          e.printStackTrace();
+    int messageCount = 100;
+    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+    for (int i = 0; i < messageCount; i++) {
+      try {
+          final int index = i;
+          // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
+          Message msg = new Message("TopicTest",
+            "TagA",
+            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+            // Send a message asynchronously, the result is returned to the client by callback
+          producer.send(msg, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+              System.out.printf("%-10d OK %s %n", index,
+                sendResult.getMsgId());
+              countDownLatch.countDown();
+            }
+            @Override
+            public void onException(Throwable e) {
+              System.out.printf("%-10d Exception %s %n", index, e);
+              e.printStackTrace();
+              countDownLatch.countDown();
+            }
+          });
+        } catch (Exception e) {
+            e.printStackTrace();
+            countDownLatch.countDown();
         }
-      });
     }
+    //If reliable transmission is required for asynchronous sending, the logic must not be terminated until a clear result is returned from the callback interface. Otherwise, closing the Producer immediately may result in some messages not being successfully transmitted.
+    countDownLatch.await(5, TimeUnit.SECONDS);
     // Close the producer once it is no longer in use
     producer.shutdown();
   }