You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/01 02:31:55 UTC
[rocketmq] branch 5.0.0-beta-auto-batch updated: [ISSUE #3717] Auto batching in producer in branch/5.0.0-beta
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-auto-batch
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-auto-batch by this push:
new 001dfa88a [ISSUE #3717] Auto batching in producer in branch/5.0.0-beta
001dfa88a is described below
commit 001dfa88ac57c3e57be7bbe10ec8c7859696cad2
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Fri Jul 1 10:31:50 2022 +0800
[ISSUE #3717] Auto batching in producer in branch/5.0.0-beta
---
.../rocketmq/client/impl/MQClientManager.java | 20 +
.../impl/producer/DefaultMQProducerImpl.java | 37 ++
.../client/producer/DefaultMQProducer.java | 505 +++++++++++++-------
.../rocketmq/client/producer/MQProducer.java | 24 +-
.../client/producer/ProduceAccumulator.java | 508 +++++++++++++++++++++
.../client/producer/DefaultMQProducerTest.java | 40 +-
.../client/producer/ProduceAccumulatorTest.java | 170 +++++++
.../rocketmq/common/message/MessageBatch.java | 2 +-
8 files changed, 1129 insertions(+), 177 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 053c049c9..99ab87f41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.ProduceAccumulator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
@@ -31,6 +32,8 @@ public class MQClientManager {
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
+ private ConcurrentMap<String/* clientId */, ProduceAccumulator> accumulatorTable =
+ new ConcurrentHashMap<String, ProduceAccumulator>();
private MQClientManager() {
@@ -40,6 +43,23 @@ public class MQClientManager {
return instance;
}
+ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) {
+ String clientId = clientConfig.buildMQClientId();
+ ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+ if (null == accumulator) {
+ accumulator = new ProduceAccumulator(clientId);
+ ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator);
+ if (prev != null) {
+ accumulator = prev;
+ log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId);
+ } else {
+ log.info("Created new ProduceAccumulator for clientId:[{}]", clientId);
+ }
+ }
+
+ return accumulator;
+ }
+
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 325ca4fa6..46fa47545 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -523,6 +523,42 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
+ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
+ final long timeout) throws MQClientException, RemotingTooMuchRequestException {
+ long beginStartTime = System.currentTimeMillis();
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+ if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ MessageQueue mq = null;
+ try {
+ List<MessageQueue> messageQueueList =
+ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+ Message userMessage = MessageAccessor.cloneMessage(msg);
+ String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
+ userMessage.setTopic(userTopic);
+
+ mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
+ } catch (Throwable e) {
+ throw new MQClientException("select message queue threw exception.", e);
+ }
+
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTime) {
+ throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
+ }
+ if (mq != null) {
+ return mq;
+ } else {
+ throw new MQClientException("select message queue return null.", null);
+ }
+ }
+
+ validateNameServerSetting();
+ throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
+ }
+
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
@@ -988,6 +1024,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
executeEndTransactionHook(context);
}
}
+
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 13f104529..abe4551f6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
@@ -39,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
@@ -51,10 +53,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* This class is the entry point for applications intending to send messages. </p>
- *
+ * <p>
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
* box for most scenarios. </p>
- *
+ * <p>
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding. </p>
*
@@ -69,20 +71,20 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog();
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
- ResponseCode.TOPIC_NOT_EXIST,
- ResponseCode.SERVICE_NOT_AVAILABLE,
- ResponseCode.SYSTEM_ERROR,
- ResponseCode.NO_PERMISSION,
- ResponseCode.NO_BUYER_ID,
- ResponseCode.NOT_IN_CURRENT_UNIT
+ ResponseCode.TOPIC_NOT_EXIST,
+ ResponseCode.SERVICE_NOT_AVAILABLE,
+ ResponseCode.SYSTEM_ERROR,
+ ResponseCode.NO_PERMISSION,
+ ResponseCode.NO_BUYER_ID,
+ ResponseCode.NOT_IN_CURRENT_UNIT
));
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved. </p>
- *
+ * <p>
* For non-transactional messages, it does not matter as long as it's unique per process. </p>
- *
+ * <p>
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
private String producerGroup;
@@ -109,14 +111,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
- *
+ * <p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
- *
+ * <p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
@@ -136,6 +138,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private TraceDispatcher traceDispatcher = null;
+ /**
+ * Switch flag instance for automatic batch message
+ */
+ private boolean autoBatch = false;
+ /**
+ * Instance for batching message automatically
+ */
+ private ProduceAccumulator produceAccumulator = null;
+
/**
* Default constructor.
*/
@@ -164,11 +175,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
@@ -178,7 +189,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String namespace, final String producerGroup) {
@@ -189,7 +200,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this(null, producerGroup, rpcHook);
@@ -198,20 +209,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying namespace, producer group and RPC hook.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
* Constructor specifying producer group and enabled msg trace flag.
*
- * @param producerGroup Producer group, see the name-sake field.
+ * @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
@@ -221,10 +233,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -234,18 +246,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
* name.
*
- * @param namespace Namespace for this MQ Producer instance.
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
//if client open the message trace feature
if (enableMsgTrace) {
try {
@@ -269,7 +282,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}
-
+
/**
* Start this producer instance. </p>
*
@@ -282,6 +295,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.start();
+ }
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
@@ -297,6 +313,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
+ this.produceAccumulator.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
@@ -324,28 +341,54 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
+
+ private boolean canBatch(Message msg) {
+ // produceAccumulator is full
+ if (!produceAccumulator.tryAddMessage(msg)) {
+ return false;
+ }
+ // delay message do not support batch processing
+ if (msg.getDelayTimeLevel() > 0) {
+ return false;
+ }
+ // retry message do not support batch processing
+ if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ return false;
+ }
+ // message which have been assigned to producer group do not support batch processing
+ if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+ return false;
+ }
+ return true;
+ }
+
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg);
+
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, null, null);
+ } else {
+ return sendDirect(msg, null, null);
+ }
}
/**
* Same to {@link #send(Message)} with send timeout specified in addition.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -357,34 +400,43 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Send message to broker asynchronously. </p>
- *
+ * <p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
- *
+ * <p>
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
+
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, sendCallback);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, null, sendCallback);
+ } else {
+ sendDirect(msg, null, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
*
- * @param msg message to send.
+ * @param msg message to send.
* @param sendCallback Callback to execute.
- * @param timeout send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -399,8 +451,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
*
* @param msg Message to send.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -413,32 +465,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Same to {@link #send(Message)} with target message queue specified in addition.
*
* @param msg Message to send.
- * @param mq Target message queue.
+ * @param mq Target message queue.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message)} with target message queue and send timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -451,29 +508,75 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with target message queue specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
+
+ public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ // send in sync mode
+ if (sendCallback == null) {
+ if (mq == null) {
+ return this.defaultMQProducerImpl.send(msg);
+ } else {
+ return this.defaultMQProducerImpl.send(msg, mq);
+ }
+ } else {
+ if (mq == null) {
+ this.defaultMQProducerImpl.send(msg, sendCallback);
+ } else {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+ }
+ return null;
+ }
+ }
+
+ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ // check whether it can batch
+ if (!canBatch(msg)) {
+ return sendDirect(msg, mq, sendCallback);
+ } else {
+ Validators.checkMessage(msg, this);
+ MessageClientIDSetter.setUniqID(msg);
+ if (sendCallback == null) {
+ return this.produceAccumulator.send(msg, mq, this);
+ } else {
+ this.produceAccumulator.send(msg, mq, sendCallback, this);
+ return null;
+ }
+ }
+ }
+
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
+ mq = queueWithNamespace(mq);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (MQBrokerException e) {
+ // ignore
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -487,9 +590,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Same to {@link #sendOneway(Message)} with target message queue specified.
*
* @param msg Message to send.
- * @param mq Target message queue.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param mq Target message queue.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -502,35 +605,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message)} with message queue selector specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
+ * @param arg Argument to work along with message queue selector.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, selector, arg);
+ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
- * @param timeout Send timeout.
+ * @param arg Argument to work along with message queue selector.
+ * @param timeout Send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -543,31 +652,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with message queue selector specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+ try {
+ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -584,13 +703,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param timeout request timeout
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -603,18 +722,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Request asynchronously. </p>
* This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
- *
+ * <p>
* Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param requestCallback callback to execute on request completion.
- * @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
@@ -626,15 +745,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, long)} with message queue selector specified.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param selector message queue selector, through which we get target message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
- * @param timeout timeout of request.
+ * @param arg argument to work along with message queue selector.
+ * @param timeout timeout of request.
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -648,15 +767,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
*
- * @param msg requst message to send
- * @param selector message queue selector, through which we get target message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
+ * @param msg requst message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
@@ -669,13 +788,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, long)} with target message queue specified in addition.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -688,14 +807,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
@@ -707,11 +826,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which to determine target message queue to deliver message
- * @param arg Argument used along with message queue selector.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param arg Argument used along with message queue selector.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -724,9 +843,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method is to send transactional messages.
*
- * @param msg Transactional message to send.
+ * @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
- * @param arg Argument used along with local transaction executor.
+ * @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@@ -754,7 +873,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
- * @param key accesskey
+ * @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @param attributes
@@ -770,9 +889,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
* use this method.
*
- * @param key accesskey
- * @param newTopic topic name
- * @param queueNum topic's queue number
+ * @param key accesskey
+ * @param newTopic topic name
+ * @param queueNum topic's queue number
* @param topicSysFlag topic system flag
* @param attributes
* @throws MQClientException if there is any client error.
@@ -786,7 +905,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Search consume queue offset of the given time stamp.
*
- * @param mq Instance of MessageQueue
+ * @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return Consume queue offset.
* @throws MQClientException if there is any client error.
@@ -798,7 +917,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query maximum offset of the given message queue.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query minimum offset of the given message queue.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query earliest message store time.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -843,14 +962,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given offset message ID.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param offsetMsgId message id
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -862,16 +981,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message by key.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
- * @param topic message topic
- * @param key message key index word
+ * @param topic message topic
+ * @param key message key index word
* @param maxNum max message number
- * @param begin from when
- * @param end to when
+ * @param begin from when
+ * @param end to when
* @return QueryResult instance contains matched messages.
- * @throws MQClientException if there is any client error.
+ * @throws MQClientException if there is any client error.
* @throws InterruptedException if the thread is interrupted.
*/
@Deprecated
@@ -883,15 +1002,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given message ID.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param topic Topic
* @param msgId Message ID
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -931,7 +1050,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
@Override
- public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ public void send(Collection<Message> msgs,
+ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
@@ -949,7 +1069,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
- SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
}
@@ -998,6 +1119,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return msgBatch;
}
+ public int getBatchMaxDelayMs() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxDelayMs();
+ }
+
+ public void batchMaxDelayMs(int holdMs) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxDelayMs(holdMs);
+ }
+
+ public long getBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxBytes();
+ }
+
+ public void batchMaxBytes(long holdSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxBytes(holdSize);
+ }
+
+ public long getTotalBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getTotalBatchMaxBytes();
+ }
+
+ public void totalBatchMaxBytes(long totalHoldSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
+ }
+
+ public boolean getAutoBatch() {
+ if (this.produceAccumulator == null) {
+ return false;
+ }
+ return this.autoBatch;
+ }
+
+ public void setAutoBatch(boolean autoBatch) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.autoBatch = autoBatch;
+ }
+
public String getProducerGroup() {
return producerGroup;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb283..78657e623 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
- RemotingException, InterruptedException;
+ RemotingException, InterruptedException, MQBrokerException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
- void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
+
+ void send(final Collection<Message> msgs,
+ final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
-
- void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+
+ void send(final Collection<Message> msgs, final SendCallback sendCallback,
+ final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
+
+ void send(final Collection<Message> msgs, final MessageQueue mq,
+ final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
+
+ void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback,
+ final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
-
+
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
new file mode 100644
index 000000000..6c5ea8531
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+ // totalHoldSize normal value
+ private long totalHoldSize = 32 * 1024 * 1024;
+ // holdSize normal value
+ private long holdSize = 32 * 1024;
+ // holdMs normal value
+ private int holdMs = 10;
+ private static final InternalLogger log = ClientLogger.getLog();
+ private final GuardForSyncSendService guardThreadForSyncSend;
+ private final GuardForAsyncSendService guardThreadForAsyncSend;
+ private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private AtomicLong currentlyHoldSize = new AtomicLong(0);
+ private final String instanceName;
+
+ public ProduceAccumulator(String instanceName) {
+ this.instanceName = instanceName;
+ this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+ this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+ }
+
+ private class GuardForSyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForSyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ ProduceAccumulator.log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws Exception {
+ Collection<MessageAccumulation> values = syncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ v.wakeup();
+ synchronized (v) {
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ syncSendBatchs.remove(v.aggregateKey, v);
+ } else {
+ v.notify();
+ }
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ private class GuardForAsyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForAsyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ ProduceAccumulator.log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws Exception {
+ Collection<MessageAccumulation> values = asyncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ if (v.readyToSend()) {
+ v.send(null);
+ }
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ asyncSendBatchs.remove(v.aggregateKey, v);
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ void start() {
+ guardThreadForSyncSend.start();
+ guardThreadForAsyncSend.start();
+ }
+
+ void shutdown() {
+ guardThreadForSyncSend.shutdown();
+ guardThreadForAsyncSend.shutdown();
+ }
+
+ int getBatchMaxDelayMs() {
+ return holdMs;
+ }
+
+ void batchMaxDelayMs(int holdMs) {
+ if (holdMs <= 0 || holdMs > 30 * 1000) {
+ throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+ }
+ this.holdMs = holdMs;
+ }
+
+ long getBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void batchMaxBytes(long holdSize) {
+ if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+ throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+ }
+ this.holdSize = holdSize;
+ }
+
+ long getTotalBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void totalBatchMaxBytes(long totalHoldSize) {
+ if (totalHoldSize <= 0) {
+ throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+ }
+ this.totalHoldSize = totalHoldSize;
+ }
+
+ private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ SendResult send(Message msg,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ SendResult send(Message msg, MessageQueue mq,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ void send(Message msg, SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ void send(Message msg, MessageQueue mq,
+ SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ boolean tryAddMessage(Message message) {
+ synchronized (currentlyHoldSize) {
+ if (currentlyHoldSize.get() < totalHoldSize) {
+ currentlyHoldSize.addAndGet(message.getBody().length);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private class AggregateKey {
+ public String topic = null;
+ public MessageQueue mq = null;
+ public boolean waitStoreMsgOK = false;
+ public String tag = null;
+
+ public AggregateKey(Message message) {
+ this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
+ }
+
+ public AggregateKey(Message message, MessageQueue mq) {
+ this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
+ }
+
+ public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
+ this.topic = topic;
+ this.mq = mq;
+ this.waitStoreMsgOK = waitStoreMsgOK;
+ this.tag = tag;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ AggregateKey key = (AggregateKey) o;
+ return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+ }
+ }
+
+ private class MessageAccumulation {
+ private final DefaultMQProducer defaultMQProducer;
+ private LinkedList<Message> messages;
+ private LinkedList<SendCallback> sendCallbacks;
+ private Set<String> keys;
+ private AtomicBoolean closed;
+ private SendResult[] sendResults;
+ private AggregateKey aggregateKey;
+ private AtomicInteger messagesSize;
+ private int count;
+ private long createTime;
+
+ public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
+ this.defaultMQProducer = defaultMQProducer;
+ this.messages = new LinkedList<Message>();
+ this.sendCallbacks = new LinkedList<SendCallback>();
+ this.keys = new HashSet<String>();
+ this.closed = new AtomicBoolean(false);
+ this.messagesSize = new AtomicInteger(0);
+ this.aggregateKey = aggregateKey;
+ this.count = 0;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ private boolean readyToSend() {
+ if (this.messagesSize.get() > holdSize
+ || System.currentTimeMillis() >= this.createTime + holdMs) {
+ return true;
+ }
+ return false;
+ }
+
+ public int add(
+ Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ int ret = -1;
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return ret;
+ }
+ ret = this.count++;
+ this.messages.add(msg);
+ messagesSize.addAndGet(msg.getBody().length);
+ String msgKeys = msg.getKeys();
+ if (msgKeys != null) {
+ this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+ }
+ }
+ synchronized (this) {
+ while (!this.closed.get()) {
+ if (readyToSend()) {
+ this.send();
+ break;
+ } else {
+ this.wait();
+ }
+ }
+ return ret;
+ }
+ }
+
+ public boolean add(Message msg,
+ SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return false;
+ }
+ this.count++;
+ this.messages.add(msg);
+ this.sendCallbacks.add(sendCallback);
+ messagesSize.getAndAdd(msg.getBody().length);
+ }
+ if (readyToSend()) {
+ this.send(sendCallback);
+ }
+ return true;
+
+ }
+
+ public synchronized void wakeup() {
+ if (this.closed.get()) {
+ return;
+ }
+ this.notify();
+ }
+
+ private MessageBatch batch() {
+ MessageBatch messageBatch = new MessageBatch(this.messages);
+ messageBatch.setTopic(this.aggregateKey.topic);
+ messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+ messageBatch.setKeys(this.keys);
+ messageBatch.setTags(this.aggregateKey.tag);
+ messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+ return messageBatch;
+ }
+
+ private void splitSendResults(SendResult sendResult) {
+ if (sendResult == null) {
+ throw new IllegalArgumentException("sendResult is null");
+ }
+ boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
+ this.sendResults = new SendResult[this.count];
+ if (!isBatchConsumerQueue) {
+ String[] msgIds = sendResult.getMsgId().split(",");
+ String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+ if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
+ throw new IllegalArgumentException("sendResult is illegal");
+ }
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
+ sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
+ sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
+ }
+ } else {
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = sendResult;
+ }
+ }
+ }
+
+ private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
+ this.splitSendResults(sendResult);
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+ }
+ } finally {
+ currentlyHoldSize.addAndGet(-messagesSize.get());
+ this.notifyAll();
+ }
+ }
+
+ private void send(SendCallback sendCallback) {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ final int size = messagesSize.get();
+ defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+ try {
+ splitSendResults(sendResult);
+ int i = 0;
+ Iterator<SendCallback> it = sendCallbacks.iterator();
+ while (it.hasNext()) {
+ SendCallback v = it.next();
+ v.onSuccess(sendResults[i++]);
+ }
+ if (i != count) {
+ throw new IllegalArgumentException("sendResult is illegal");
+ }
+ currentlyHoldSize.addAndGet(-size);
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ @Override public void onException(Throwable e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ currentlyHoldSize.addAndGet(-size);
+ }
+ });
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+ }
+ } catch (Exception e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ }
+ }
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 7347c2ab2..43a8ffb74 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -229,10 +229,10 @@ public class DefaultMQProducerTest {
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(5);
}
-
+
@Test
public void testBatchSendMessageAsync()
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
@@ -458,6 +458,42 @@ public class DefaultMQProducerTest {
assertThat(cc.get()).isEqualTo(1);
}
+ @Test
+ public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ producer.setAutoBatch(true);
+ producer.send(message, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ });
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ producer.setAutoBatch(false);
+ }
+
+ @Test
+ public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ producer.setAutoBatch(true);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ SendResult sendResult = producer.send(message);
+
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ producer.setAutoBatch(false);
+ }
+
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
new file mode 100644
index 000000000..183ff704f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProduceAccumulatorTest {
+ private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
+ if (!a.getTopic().equals(b.getTopic())) {
+ return false;
+ }
+ if (!Arrays.equals(a.getBody(), b.getBody())) {
+ return false;
+ }
+ return true;
+ }
+
+ private class MockMQProducer extends DefaultMQProducer {
+ private Message beSendMessage = null;
+ private MessageQueue beSendMessageQueue = null;
+
+ @Override public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) {
+ this.beSendMessage = msg;
+ this.beSendMessageQueue = mq;
+
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("123");
+ if (sendCallback != null) {
+ sendCallback.onSuccess(sendResult);
+ }
+ return sendResult;
+ }
+ }
+
+ @Test
+ public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ for (Message message : messages) {
+ produceAccumulator.send(message, new SendCallback() {
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+
+ @Override public void onSuccess(SendResult sendResult) {
+ finalCountDownLatch.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ finalCountDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ }
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat((mockMQProducer.beSendMessage instanceof MessageBatch)).isTrue();
+
+ MessageBatch messageBatch_1 = (MessageBatch) mockMQProducer.beSendMessage;
+ MessageBatch messageBatch_2 = MessageBatch.generateFromList(messages);
+ messageBatch_2.setBody(messageBatch_2.encode());
+
+ assertThat(compareMessageBatch(messageBatch_1, messageBatch_2)).isTrue();
+ }
+
+ @Test
+ public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ final MockMQProducer mockMQProducer = new MockMQProducer();
+
+ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ final CountDownLatch countDownLatch = new CountDownLatch(messages.size());
+
+ for (final Message message : messages) {
+ new Thread(new Runnable() {
+ final ProduceAccumulator finalProduceAccumulator = produceAccumulator;
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+ final MockMQProducer finalMockMQProducer = mockMQProducer;
+ final Message finalMessage = message;
+
+ @Override public void run() {
+ try {
+ finalProduceAccumulator.send(finalMessage, finalMockMQProducer);
+ finalCountDownLatch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat((mockMQProducer.beSendMessage instanceof MessageBatch)).isTrue();
+
+ MessageBatch messageBatch_1 = (MessageBatch) mockMQProducer.beSendMessage;
+ MessageBatch messageBatch_2 = MessageBatch.generateFromList(messages);
+ messageBatch_2.setBody(messageBatch_2.encode());
+
+ assertThat(messageBatch_1.getTopic()).isEqualTo(messageBatch_2.getTopic());
+ // The execution order is uncertain, just compare the length
+ assertThat(messageBatch_1.getBody().length).isEqualTo(messageBatch_2.getBody().length);
+ }
+
+ @Test
+ public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0);
+ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ Message message = new Message("testTopic", "1".getBytes());
+ produceAccumulator.send(message, messageQueue, mockMQProducer);
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ produceAccumulator.send(message, messageQueue, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+ countDownLatch.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a6b801eda..47ab7f426 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
- private MessageBatch(List<Message> messages) {
+ public MessageBatch(List<Message> messages) {
this.messages = messages;
}