You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/01 02:31:55 UTC

[rocketmq] branch 5.0.0-beta-auto-batch updated: [ISSUE #3717] Auto batching in producer in branch/5.0.0-beta

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-auto-batch
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-auto-batch by this push:
     new 001dfa88a [ISSUE #3717] Auto batching in producer in branch/5.0.0-beta
001dfa88a is described below

commit 001dfa88ac57c3e57be7bbe10ec8c7859696cad2
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Fri Jul 1 10:31:50 2022 +0800

    [ISSUE #3717] Auto batching in producer in branch/5.0.0-beta
---
 .../rocketmq/client/impl/MQClientManager.java      |  20 +
 .../impl/producer/DefaultMQProducerImpl.java       |  37 ++
 .../client/producer/DefaultMQProducer.java         | 505 +++++++++++++-------
 .../rocketmq/client/producer/MQProducer.java       |  24 +-
 .../client/producer/ProduceAccumulator.java        | 508 +++++++++++++++++++++
 .../client/producer/DefaultMQProducerTest.java     |  40 +-
 .../client/producer/ProduceAccumulatorTest.java    | 170 +++++++
 .../rocketmq/common/message/MessageBatch.java      |   2 +-
 8 files changed, 1129 insertions(+), 177 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 053c049c9..99ab87f41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.ProduceAccumulator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 
@@ -31,6 +32,8 @@ public class MQClientManager {
     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
         new ConcurrentHashMap<String, MQClientInstance>();
+    private ConcurrentMap<String/* clientId */, ProduceAccumulator> accumulatorTable =
+        new ConcurrentHashMap<String, ProduceAccumulator>();
 
     private MQClientManager() {
 
@@ -40,6 +43,23 @@ public class MQClientManager {
         return instance;
     }
 
+    public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) {
+        String clientId = clientConfig.buildMQClientId();
+        ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+        if (null == accumulator) {
+            accumulator = new ProduceAccumulator(clientId);
+            ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator);
+            if (prev != null) {
+                accumulator = prev;
+                log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId);
+            } else {
+                log.info("Created new ProduceAccumulator for clientId:[{}]", clientId);
+            }
+        }
+
+        return accumulator;
+    }
+
     public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
         return getOrCreateMQClientInstance(clientConfig, null);
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 325ca4fa6..46fa47545 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -523,6 +523,42 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     }
 
+    public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
+        final long timeout) throws MQClientException, RemotingTooMuchRequestException {
+        long beginStartTime = System.currentTimeMillis();
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+            MessageQueue mq = null;
+            try {
+                List<MessageQueue> messageQueueList =
+                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+                Message userMessage = MessageAccessor.cloneMessage(msg);
+                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
+                userMessage.setTopic(userTopic);
+
+                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
+            } catch (Throwable e) {
+                throw new MQClientException("select message queue threw exception.", e);
+            }
+
+            long costTime = System.currentTimeMillis() - beginStartTime;
+            if (timeout < costTime) {
+                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
+            }
+            if (mq != null) {
+                return mq;
+            } else {
+                throw new MQClientException("select message queue return null.", null);
+            }
+        }
+
+        validateNameServerSetting();
+        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
+    }
+
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
     }
@@ -988,6 +1024,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             executeEndTransactionHook(context);
         }
     }
+
     /**
      * DEFAULT ONEWAY -------------------------------------------------------
      */
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 13f104529..abe4551f6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
@@ -39,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
@@ -51,10 +53,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 
 /**
  * This class is the entry point for applications intending to send messages. </p>
- *
+ * <p>
  * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
  * box for most scenarios. </p>
- *
+ * <p>
  * This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
  * cons; you'd better understand strengths and weakness of them before actually coding. </p>
  *
@@ -69,20 +71,20 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
     private final InternalLogger log = ClientLogger.getLog();
     private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
-            ResponseCode.TOPIC_NOT_EXIST,
-            ResponseCode.SERVICE_NOT_AVAILABLE,
-            ResponseCode.SYSTEM_ERROR,
-            ResponseCode.NO_PERMISSION,
-            ResponseCode.NO_BUYER_ID,
-            ResponseCode.NOT_IN_CURRENT_UNIT
+        ResponseCode.TOPIC_NOT_EXIST,
+        ResponseCode.SERVICE_NOT_AVAILABLE,
+        ResponseCode.SYSTEM_ERROR,
+        ResponseCode.NO_PERMISSION,
+        ResponseCode.NO_BUYER_ID,
+        ResponseCode.NOT_IN_CURRENT_UNIT
     ));
 
     /**
      * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
      * important when transactional messages are involved. </p>
-     *
+     * <p>
      * For non-transactional messages, it does not matter as long as it's unique per process. </p>
-     *
+     * <p>
      * See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
      */
     private String producerGroup;
@@ -109,14 +111,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
-     *
+     * <p>
      * This may potentially cause message duplication which is up to application developers to resolve.
      */
     private int retryTimesWhenSendFailed = 2;
 
     /**
      * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
-     *
+     * <p>
      * This may potentially cause message duplication which is up to application developers to resolve.
      */
     private int retryTimesWhenSendAsyncFailed = 2;
@@ -136,6 +138,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Switch flag instance for automatic batch message
+     */
+    private boolean autoBatch = false;
+    /**
+     * Instance for batching message automatically
+     */
+    private ProduceAccumulator produceAccumulator = null;
+
     /**
      * Default constructor.
      */
@@ -164,11 +175,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Constructor specifying producer group.
      *
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param rpcHook              RPC hook to execute per each remoting command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
         final String customizedTraceTopic) {
@@ -178,7 +189,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Constructor specifying producer group.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup) {
@@ -189,7 +200,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * Constructor specifying both producer group and RPC hook.
      *
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command execution.
      */
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
         this(null, producerGroup, rpcHook);
@@ -198,20 +209,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Constructor specifying namespace, producer group and RPC hook.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command execution.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
 
     /**
      * Constructor specifying producer group and enabled msg trace flag.
      *
-     * @param producerGroup Producer group, see the name-sake field.
+     * @param producerGroup  Producer group, see the name-sake field.
      * @param enableMsgTrace Switch flag instance for message trace.
      */
     public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
@@ -221,10 +233,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
      *
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
         this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -234,18 +246,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
      * name.
      *
-     * @param namespace Namespace for this MQ Producer instance.
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param namespace            Namespace for this MQ Producer instance.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param rpcHook              RPC hook to execute per each remoting command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
         boolean enableMsgTrace, final String customizedTraceTopic) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
@@ -269,7 +282,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
             ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
         }
     }
-    
+
     /**
      * Start this producer instance. </p>
      *
@@ -282,6 +295,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     public void start() throws MQClientException {
         this.setProducerGroup(withNamespace(this.producerGroup));
         this.defaultMQProducerImpl.start();
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.start();
+        }
         if (null != traceDispatcher) {
             try {
                 traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
@@ -297,6 +313,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     @Override
     public void shutdown() {
         this.defaultMQProducerImpl.shutdown();
+        this.produceAccumulator.shutdown();
         if (null != traceDispatcher) {
             traceDispatcher.shutdown();
         }
@@ -324,28 +341,54 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);
+        } else {
+            return sendDirect(msg, null, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message)} with send timeout specified in addition.
      *
-     * @param msg Message to send.
+     * @param msg     Message to send.
      * @param timeout send timeout.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -357,34 +400,43 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Send message to broker asynchronously. </p>
-     *
+     * <p>
      * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
-     *
+     * <p>
      * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
      * application developers are the one to resolve this potential issue.
      *
-     * @param msg Message to send.
+     * @param msg          Message to send.
      * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
     @Override
     public void send(Message msg,
         SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, sendCallback);
+        try {
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, null, sendCallback);
+            } else {
+                sendDirect(msg, null, sendCallback);
+            }
+        } catch (Throwable e) {
+            sendCallback.onException(e);
+        }
     }
 
     /**
      * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
      *
-     * @param msg message to send.
+     * @param msg          message to send.
      * @param sendCallback Callback to execute.
-     * @param timeout send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -399,8 +451,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
      *
      * @param msg Message to send.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -413,32 +465,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * Same to {@link #send(Message)} with target message queue specified in addition.
      *
      * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param mq  Target message queue.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public SendResult send(Message msg, MessageQueue mq)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
+        mq = queueWithNamespace(mq);
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, mq, null);
+        } else {
+            return sendDirect(msg, mq, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message)} with target message queue and send timeout specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg     Message to send.
+     * @param mq      Target message queue.
      * @param timeout send timeout.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -451,29 +508,75 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #send(Message, SendCallback)} with target message queue specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg          Message to send.
+     * @param mq           Target message queue.
      * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    public SendResult sendDirect(Message msg, MessageQueue mq,
+        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        // send in sync mode
+        if (sendCallback == null) {
+            if (mq == null) {
+                return this.defaultMQProducerImpl.send(msg);
+            } else {
+                return this.defaultMQProducerImpl.send(msg, mq);
+            }
+        } else {
+            if (mq == null) {
+                this.defaultMQProducerImpl.send(msg, sendCallback);
+            } else {
+                this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+            }
+            return null;
+        }
+    }
+
+    public SendResult sendByAccumulator(Message msg, MessageQueue mq,
+        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        // check whether it can batch
+        if (!canBatch(msg)) {
+            return sendDirect(msg, mq, sendCallback);
+        } else {
+            Validators.checkMessage(msg, this);
+            MessageClientIDSetter.setUniqID(msg);
+            if (sendCallback == null) {
+                return this.produceAccumulator.send(msg, mq, this);
+            } else {
+                this.produceAccumulator.send(msg, mq, sendCallback, this);
+                return null;
+            }
+        }
+    }
+
     @Override
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
         throws MQClientException, RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
+        mq = queueWithNamespace(mq);
+        try {
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, mq, sendCallback);
+            } else {
+                sendDirect(msg, mq, sendCallback);
+            }
+        } catch (MQBrokerException e) {
+            // ignore
+        }
     }
 
     /**
      * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg          Message to send.
+     * @param mq           Target message queue.
      * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
-     * @param timeout Send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      Send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -487,9 +590,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * Same to {@link #sendOneway(Message)} with target message queue specified.
      *
      * @param msg Message to send.
-     * @param mq Target message queue.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param mq  Target message queue.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -502,35 +605,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #send(Message)} with message queue selector specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which we get target message queue to deliver message to.
-     * @param arg Argument to work along with message queue selector.
+     * @param arg      Argument to work along with message queue selector.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg, selector, arg);
+        MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+        mq = queueWithNamespace(mq);
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, mq, null);
+        } else {
+            return sendDirect(msg, mq, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which we get target message queue to deliver message to.
-     * @param arg Argument to work along with message queue selector.
-     * @param timeout Send timeout.
+     * @param arg      Argument to work along with message queue selector.
+     * @param timeout  Send timeout.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -543,31 +652,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #send(Message, SendCallback)} with message queue selector specified.
      *
-     * @param msg Message to send.
-     * @param selector Message selector through which to get target message queue.
-     * @param arg Argument used along with message queue selector.
+     * @param msg          Message to send.
+     * @param selector     Message selector through which to get target message queue.
+     * @param arg          Argument used along with message queue selector.
      * @param sendCallback callback to execute on sending completion.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
         throws MQClientException, RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+        try {
+            MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+            mq = queueWithNamespace(mq);
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, mq, sendCallback);
+            } else {
+                sendDirect(msg, mq, sendCallback);
+            }
+        } catch (Throwable e) {
+            sendCallback.onException(e);
+        }
     }
 
     /**
      * Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified.
      *
-     * @param msg Message to send.
-     * @param selector Message selector through which to get target message queue.
-     * @param arg Argument used along with message queue selector.
+     * @param msg          Message to send.
+     * @param selector     Message selector through which to get target message queue.
+     * @param arg          Argument used along with message queue selector.
      * @param sendCallback callback to execute on sending completion.
-     * @param timeout Send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      Send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -584,13 +703,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
      * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
      *
-     * @param msg request message to send
+     * @param msg     request message to send
      * @param timeout request timeout
      * @return reply message
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -603,18 +722,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Request asynchronously. </p>
      * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
-     *
+     * <p>
      * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
      * application developers are the one to resolve this potential issue.
      *
-     * @param msg request message to send
+     * @param msg             request message to send
      * @param requestCallback callback to execute on request completion.
-     * @param timeout request timeout
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         request timeout
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
@@ -626,15 +745,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #request(Message, long)}  with message queue selector specified.
      *
-     * @param msg request message to send
+     * @param msg      request message to send
      * @param selector message queue selector, through which we get target message queue to deliver message to.
-     * @param arg argument to work along with message queue selector.
-     * @param timeout timeout of request.
+     * @param arg      argument to work along with message queue selector.
+     * @param timeout  timeout of request.
      * @return reply message
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -648,15 +767,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
      *
-     * @param msg requst message to send
-     * @param selector message queue selector, through which we get target message queue to deliver message to.
-     * @param arg argument to work along with message queue selector.
+     * @param msg             requst message to send
+     * @param selector        message queue selector, through which we get target message queue to deliver message to.
+     * @param arg             argument to work along with message queue selector.
      * @param requestCallback callback to execute on request completion.
-     * @param timeout timeout of request.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         timeout of request.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
@@ -669,13 +788,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #request(Message, long)}  with target message queue specified in addition.
      *
-     * @param msg request message to send
-     * @param mq target message queue.
+     * @param msg     request message to send
+     * @param mq      target message queue.
      * @param timeout request timeout
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -688,14 +807,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
      *
-     * @param msg request message to send
-     * @param mq target message queue.
+     * @param msg             request message to send
+     * @param mq              target message queue.
      * @param requestCallback callback to execute on request completion.
-     * @param timeout timeout of request.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         timeout of request.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
@@ -707,11 +826,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Same to {@link #sendOneway(Message)} with message queue selector specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which to determine target message queue to deliver message
-     * @param arg Argument used along with message queue selector.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param arg      Argument used along with message queue selector.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -724,9 +843,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * This method is to send transactional messages.
      *
-     * @param msg Transactional message to send.
+     * @param msg          Transactional message to send.
      * @param tranExecuter local transaction executor.
-     * @param arg Argument used along with local transaction executor.
+     * @param arg          Argument used along with local transaction executor.
      * @return Transaction result.
      * @throws MQClientException if there is any client error.
      */
@@ -754,7 +873,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
-     * @param key accesskey
+     * @param key      accesskey
      * @param newTopic topic name
      * @param queueNum topic's queue number
      * @param attributes
@@ -770,9 +889,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
      * use this method.
      *
-     * @param key accesskey
-     * @param newTopic topic name
-     * @param queueNum topic's queue number
+     * @param key          accesskey
+     * @param newTopic     topic name
+     * @param queueNum     topic's queue number
      * @param topicSysFlag topic system flag
      * @param attributes
      * @throws MQClientException if there is any client error.
@@ -786,7 +905,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     /**
      * Search consume queue offset of the given time stamp.
      *
-     * @param mq Instance of MessageQueue
+     * @param mq        Instance of MessageQueue
      * @param timestamp from when in milliseconds.
      * @return Consume queue offset.
      * @throws MQClientException if there is any client error.
@@ -798,7 +917,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query maximum offset of the given message queue.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query minimum offset of the given message queue.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query earliest message store time.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -843,14 +962,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query message of the given offset message ID.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
      * @param offsetMsgId message id
      * @return Message specified.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Deprecated
@@ -862,16 +981,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query message by key.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
-     * @param topic message topic
-     * @param key message key index word
+     * @param topic  message topic
+     * @param key    message key index word
      * @param maxNum max message number
-     * @param begin from when
-     * @param end to when
+     * @param begin  from when
+     * @param end    to when
      * @return QueryResult instance contains matched messages.
-     * @throws MQClientException if there is any client error.
+     * @throws MQClientException    if there is any client error.
      * @throws InterruptedException if the thread is interrupted.
      */
     @Deprecated
@@ -883,15 +1002,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     /**
      * Query message of the given message ID.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      *
      * @param topic Topic
      * @param msgId Message ID
      * @return Message specified.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Deprecated
@@ -931,7 +1050,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     @Override
-    public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+    public void send(Collection<Message> msgs,
+        SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
     }
 
@@ -949,7 +1069,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     @Override
     public void send(Collection<Message> msgs, MessageQueue mq,
-        SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SendCallback sendCallback,
+        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
     }
 
@@ -998,6 +1119,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         return msgBatch;
     }
 
+    public int getBatchMaxDelayMs() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getBatchMaxDelayMs();
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+        }
+        this.produceAccumulator.batchMaxDelayMs(holdMs);
+    }
+
+    public long getBatchMaxBytes() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getBatchMaxBytes();
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+        }
+        this.produceAccumulator.batchMaxBytes(holdSize);
+    }
+
+    public long getTotalBatchMaxBytes() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getTotalBatchMaxBytes();
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+        }
+        this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
+    }
+
+    public boolean getAutoBatch() {
+        if (this.produceAccumulator == null) {
+            return false;
+        }
+        return this.autoBatch;
+    }
+
+    public void setAutoBatch(boolean autoBatch) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+        }
+        this.autoBatch = autoBatch;
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb283..78657e623 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
         RemotingException, MQBrokerException, InterruptedException;
 
     void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
-        RemotingException, InterruptedException;
+        RemotingException, InterruptedException, MQBrokerException;
 
     void send(final Message msg, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException;
@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
 
     SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-    
-    void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
+
+    void send(final Collection<Message> msgs,
+        final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
         InterruptedException;
-    
-    void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+
+    void send(final Collection<Message> msgs, final SendCallback sendCallback,
+        final long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException;
-    
-    void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
+
+    void send(final Collection<Message> msgs, final MessageQueue mq,
+        final SendCallback sendCallback) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException;
-    
-    void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
+
+    void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback,
+        final long timeout) throws MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
-    
+
     //for rpc
     Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
new file mode 100644
index 000000000..6c5ea8531
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+    }
+
+    private class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            for (MessageAccumulation v : values) {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            }
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    private class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            for (MessageAccumulation v : values) {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            }
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+        return previous == null ? batch : previous;
+    }
+
+    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+        return previous == null ? batch : previous;
+    }
+
+    SendResult send(Message msg,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    SendResult send(Message msg, MessageQueue mq,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    void send(Message msg, SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    void send(Message msg, MessageQueue mq,
+        SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    boolean tryAddMessage(Message message) {
+        synchronized (currentlyHoldSize) {
+            if (currentlyHoldSize.get() < totalHoldSize) {
+                currentlyHoldSize.addAndGet(message.getBody().length);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private class AggregateKey {
+        public String topic = null;
+        public MessageQueue mq = null;
+        public boolean waitStoreMsgOK = false;
+        public String tag = null;
+
+        public AggregateKey(Message message) {
+            this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(Message message, MessageQueue mq) {
+            this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
+            this.topic = topic;
+            this.mq = mq;
+            this.waitStoreMsgOK = waitStoreMsgOK;
+            this.tag = tag;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            AggregateKey key = (AggregateKey) o;
+            return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+        }
+    }
+
+    private class MessageAccumulation {
+        private final DefaultMQProducer defaultMQProducer;
+        private LinkedList<Message> messages;
+        private LinkedList<SendCallback> sendCallbacks;
+        private Set<String> keys;
+        private AtomicBoolean closed;
+        private SendResult[] sendResults;
+        private AggregateKey aggregateKey;
+        private AtomicInteger messagesSize;
+        private int count;
+        private long createTime;
+
+        public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
+            this.defaultMQProducer = defaultMQProducer;
+            this.messages = new LinkedList<Message>();
+            this.sendCallbacks = new LinkedList<SendCallback>();
+            this.keys = new HashSet<String>();
+            this.closed = new AtomicBoolean(false);
+            this.messagesSize = new AtomicInteger(0);
+            this.aggregateKey = aggregateKey;
+            this.count = 0;
+            this.createTime = System.currentTimeMillis();
+        }
+
+        private boolean readyToSend() {
+            if (this.messagesSize.get() > holdSize
+                || System.currentTimeMillis() >= this.createTime + holdMs) {
+                return true;
+            }
+            return false;
+        }
+
+        public int add(
+            Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+            int ret = -1;
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return ret;
+                }
+                ret = this.count++;
+                this.messages.add(msg);
+                messagesSize.addAndGet(msg.getBody().length);
+                String msgKeys = msg.getKeys();
+                if (msgKeys != null) {
+                    this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+                }
+            }
+            synchronized (this) {
+                while (!this.closed.get()) {
+                    if (readyToSend()) {
+                        this.send();
+                        break;
+                    } else {
+                        this.wait();
+                    }
+                }
+                return ret;
+            }
+        }
+
+        public boolean add(Message msg,
+            SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return false;
+                }
+                this.count++;
+                this.messages.add(msg);
+                this.sendCallbacks.add(sendCallback);
+                messagesSize.getAndAdd(msg.getBody().length);
+            }
+            if (readyToSend()) {
+                this.send(sendCallback);
+            }
+            return true;
+
+        }
+
+        public synchronized void wakeup() {
+            if (this.closed.get()) {
+                return;
+            }
+            this.notify();
+        }
+
+        private MessageBatch batch() {
+            MessageBatch messageBatch = new MessageBatch(this.messages);
+            messageBatch.setTopic(this.aggregateKey.topic);
+            messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+            messageBatch.setKeys(this.keys);
+            messageBatch.setTags(this.aggregateKey.tag);
+            messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+            return messageBatch;
+        }
+
+        private void splitSendResults(SendResult sendResult) {
+            if (sendResult == null) {
+                throw new IllegalArgumentException("sendResult is null");
+            }
+            boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
+            this.sendResults = new SendResult[this.count];
+            if (!isBatchConsumerQueue) {
+                String[] msgIds = sendResult.getMsgId().split(",");
+                String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+                if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
+                    throw new IllegalArgumentException("sendResult is illegal");
+                }
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
+                        sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
+                        sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
+                }
+            } else {
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = sendResult;
+                }
+            }
+        }
+
+        private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
+                    this.splitSendResults(sendResult);
+                } else {
+                    throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+                }
+            } finally {
+                currentlyHoldSize.addAndGet(-messagesSize.get());
+                this.notifyAll();
+            }
+        }
+
+        private void send(SendCallback sendCallback) {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    final int size = messagesSize.get();
+                    defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() {
+                        @Override public void onSuccess(SendResult sendResult) {
+                            try {
+                                splitSendResults(sendResult);
+                                int i = 0;
+                                Iterator<SendCallback> it = sendCallbacks.iterator();
+                                while (it.hasNext()) {
+                                    SendCallback v = it.next();
+                                    v.onSuccess(sendResults[i++]);
+                                }
+                                if (i != count) {
+                                    throw new IllegalArgumentException("sendResult is illegal");
+                                }
+                                currentlyHoldSize.addAndGet(-size);
+                            } catch (Exception e) {
+                                onException(e);
+                            }
+                        }
+
+                        @Override public void onException(Throwable e) {
+                            for (SendCallback v : sendCallbacks) {
+                                v.onException(e);
+                            }
+                            currentlyHoldSize.addAndGet(-size);
+                        }
+                    });
+                } else {
+                    throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+                }
+            } catch (Exception e) {
+                for (SendCallback v : sendCallbacks) {
+                    v.onException(e);
+                }
+            }
+        }
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 7347c2ab2..43a8ffb74 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -229,10 +229,10 @@ public class DefaultMQProducerTest {
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(5);
     }
-    
+
     @Test
     public void testBatchSendMessageAsync()
-            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(4);
 
@@ -458,6 +458,42 @@ public class DefaultMQProducerTest {
         assertThat(cc.get()).isEqualTo(1);
     }
 
+    @Test
+    public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        producer.setAutoBatch(true);
+        producer.send(message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                countDownLatch.countDown();
+            }
+        });
+
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        producer.setAutoBatch(false);
+    }
+
+    @Test
+    public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        producer.setAutoBatch(true);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+        producer.setAutoBatch(false);
+    }
+
     public static TopicRouteData createTopicRoute() {
         TopicRouteData topicRouteData = new TopicRouteData();
 
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
new file mode 100644
index 000000000..183ff704f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProduceAccumulatorTest {
+    private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
+        if (!a.getTopic().equals(b.getTopic())) {
+            return false;
+        }
+        if (!Arrays.equals(a.getBody(), b.getBody())) {
+            return false;
+        }
+        return true;
+    }
+
+    private class MockMQProducer extends DefaultMQProducer {
+        private Message beSendMessage = null;
+        private MessageQueue beSendMessageQueue = null;
+
+        @Override public SendResult sendDirect(Message msg, MessageQueue mq,
+            SendCallback sendCallback) {
+            this.beSendMessage = msg;
+            this.beSendMessageQueue = mq;
+
+            SendResult sendResult = new SendResult();
+            sendResult.setMsgId("123");
+            if (sendCallback != null) {
+                sendCallback.onSuccess(sendResult);
+            }
+            return sendResult;
+        }
+    }
+
+    @Test
+    public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+        MockMQProducer mockMQProducer = new MockMQProducer();
+
+        ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        List<Message> messages = new ArrayList<Message>();
+        messages.add(new Message("testTopic", "1".getBytes()));
+        messages.add(new Message("testTopic", "22".getBytes()));
+        messages.add(new Message("testTopic", "333".getBytes()));
+        messages.add(new Message("testTopic", "4444".getBytes()));
+        messages.add(new Message("testTopic", "55555".getBytes()));
+        for (Message message : messages) {
+            produceAccumulator.send(message, new SendCallback() {
+                final CountDownLatch finalCountDownLatch = countDownLatch;
+
+                @Override public void onSuccess(SendResult sendResult) {
+                    finalCountDownLatch.countDown();
+                }
+
+                @Override public void onException(Throwable e) {
+                    finalCountDownLatch.countDown();
+                }
+            }, mockMQProducer);
+        }
+        assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+        assertThat((mockMQProducer.beSendMessage instanceof MessageBatch)).isTrue();
+
+        MessageBatch messageBatch_1 = (MessageBatch) mockMQProducer.beSendMessage;
+        MessageBatch messageBatch_2 = MessageBatch.generateFromList(messages);
+        messageBatch_2.setBody(messageBatch_2.encode());
+
+        assertThat(compareMessageBatch(messageBatch_1, messageBatch_2)).isTrue();
+    }
+
+    @Test
+    public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+        final MockMQProducer mockMQProducer = new MockMQProducer();
+
+        final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        List<Message> messages = new ArrayList<Message>();
+        messages.add(new Message("testTopic", "1".getBytes()));
+        messages.add(new Message("testTopic", "22".getBytes()));
+        messages.add(new Message("testTopic", "333".getBytes()));
+        messages.add(new Message("testTopic", "4444".getBytes()));
+        messages.add(new Message("testTopic", "55555".getBytes()));
+        final CountDownLatch countDownLatch = new CountDownLatch(messages.size());
+
+        for (final Message message : messages) {
+            new Thread(new Runnable() {
+                final ProduceAccumulator finalProduceAccumulator = produceAccumulator;
+                final CountDownLatch finalCountDownLatch = countDownLatch;
+                final MockMQProducer finalMockMQProducer = mockMQProducer;
+                final Message finalMessage = message;
+
+                @Override public void run() {
+                    try {
+                        finalProduceAccumulator.send(finalMessage, finalMockMQProducer);
+                        finalCountDownLatch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }).start();
+        }
+        assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+        assertThat((mockMQProducer.beSendMessage instanceof MessageBatch)).isTrue();
+
+        MessageBatch messageBatch_1 = (MessageBatch) mockMQProducer.beSendMessage;
+        MessageBatch messageBatch_2 = MessageBatch.generateFromList(messages);
+        messageBatch_2.setBody(messageBatch_2.encode());
+
+        assertThat(messageBatch_1.getTopic()).isEqualTo(messageBatch_2.getTopic());
+        // The execution order is uncertain, just compare the length
+        assertThat(messageBatch_1.getBody().length).isEqualTo(messageBatch_2.getBody().length);
+    }
+
+    @Test
+    public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+        MockMQProducer mockMQProducer = new MockMQProducer();
+
+        MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0);
+        final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        Message message = new Message("testTopic", "1".getBytes());
+        produceAccumulator.send(message, messageQueue, mockMQProducer);
+        assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        produceAccumulator.send(message, messageQueue, new SendCallback() {
+            @Override public void onSuccess(SendResult sendResult) {
+                countDownLatch.countDown();
+            }
+
+            @Override public void onException(Throwable e) {
+                countDownLatch.countDown();
+            }
+        }, mockMQProducer);
+        assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+        assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a6b801eda..47ab7f426 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable<Message> {
     private static final long serialVersionUID = 621335151046335557L;
     private final List<Message> messages;
 
-    private MessageBatch(List<Message> messages) {
+    public MessageBatch(List<Message> messages) {
         this.messages = messages;
     }