You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:51:52 UTC

[rocketmq] branch develop updated: [ISSUE #4323] Batch example add the default NamesrvAddr (#4328)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ecb24c453 [ISSUE #4323] Batch example add the default NamesrvAddr (#4328)
ecb24c453 is described below

commit ecb24c453cacd937db5707afcdc8d9369ce99b2f
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:51:46 2022 +0800

    [ISSUE #4323] Batch example add the default NamesrvAddr (#4328)
    
    * batch example add the default NamesrvAddr
    
    * Add comments
    
    * update annotation
---
 .../example/batch/SimpleBatchProducer.java         | 20 +++++++++-----
 .../rocketmq/example/batch/SplitBatchProducer.java | 31 +++++++++++++++-------
 2 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
index cf566aa15..30863032e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
@@ -20,22 +20,30 @@ package org.apache.rocketmq.example.batch;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 
 public class SimpleBatchProducer {
 
+    public static final String PRODUCER_GROUP = "BatchProducerGroupName";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "BatchTest";
+    public static final String TAG = "Tag";
+
     public static void main(String[] args) throws Exception {
-        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         producer.start();
 
         //If you just send messages of no more than 1MiB at a time, it is easy to use batch
         //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
-        String topic = "BatchTest";
         List<Message> messages = new ArrayList<>();
-        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
-        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
-        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
+        messages.add(new Message(TOPIC, TAG, "OrderID001", "Hello world 0".getBytes()));
+        messages.add(new Message(TOPIC, TAG, "OrderID002", "Hello world 1".getBytes()));
+        messages.add(new Message(TOPIC, TAG, "OrderID003", "Hello world 2".getBytes()));
 
-        producer.send(messages);
+        SendResult sendResult = producer.send(messages);
+        System.out.printf("%s", sendResult);
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
index f9495c417..aca4f1683 100644
--- a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
@@ -22,34 +22,44 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 
 public class SplitBatchProducer {
 
+    public static final String PRODUCER_GROUP = "BatchProducerGroupName";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+
+    public static final int MESSAGE_COUNT = 100 * 1000;
+    public static final String TOPIC = "BatchTest";
+    public static final String TAG = "Tag";
+
     public static void main(String[] args) throws Exception {
 
-        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         producer.start();
 
         //large batch
-        String topic = "BatchTest";
-        List<Message> messages = new ArrayList<>(100 * 1000);
-        for (int i = 0; i < 100 * 1000; i++) {
-            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
+        List<Message> messages = new ArrayList<>(MESSAGE_COUNT);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes()));
         }
 
         //split the large batch into small ones:
         ListSplitter splitter = new ListSplitter(messages);
         while (splitter.hasNext()) {
             List<Message> listItem = splitter.next();
-            producer.send(listItem);
+            SendResult sendResult = producer.send(listItem);
+            System.out.printf("%s", sendResult);
         }
     }
 
 }
 
 class ListSplitter implements Iterator<List<Message>> {
-    private int sizeLimit = 1000 * 1000;
+    private static final int SIZE_LIMIT = 1000 * 1000;
     private final List<Message> messages;
     private int currIndex;
 
@@ -73,8 +83,9 @@ class ListSplitter implements Iterator<List<Message>> {
             for (Map.Entry<String, String> entry : properties.entrySet()) {
                 tmpSize += entry.getKey().length() + entry.getValue().length();
             }
-            tmpSize = tmpSize + 20; //for log overhead
-            if (tmpSize > sizeLimit) {
+            //for log overhead
+            tmpSize = tmpSize + 20;
+            if (tmpSize > SIZE_LIMIT) {
                 //it is unexpected that single message exceeds the sizeLimit
                 //here just let it go, otherwise it will block the splitting process
                 if (nextIndex - currIndex == 0) {
@@ -83,7 +94,7 @@ class ListSplitter implements Iterator<List<Message>> {
                 }
                 break;
             }
-            if (tmpSize + totalSize > sizeLimit) {
+            if (tmpSize + totalSize > SIZE_LIMIT) {
                 break;
             } else {
                 totalSize += tmpSize;