You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/08 10:36:16 UTC
[rocketmq-client-cpp] 02/02: feat: support async send for batch
message
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 1c868a720e78b8af43df28618d5ff997c7cd6e17
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:40 2021 +0800
feat: support async send for batch message
---
include/DefaultMQProducer.h | 5 +++++
include/MQProducer.h | 5 +++++
src/producer/DefaultMQProducer.cpp | 16 ++++++++++++++++
src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
src/producer/DefaultMQProducerImpl.h | 7 ++++++-
5 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index ec5e5df..5339db1 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -75,6 +75,11 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
// RPC
MQMessage request(MQMessage& msg, long timeout) override;
void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index 6fc281e..c8226a5 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -72,6 +72,11 @@ class ROCKETMQCLIENT_API MQProducer {
virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+
// RPC
virtual MQMessage request(MQMessage& msg, long timeout) = 0;
virtual void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 89fd57a..75deff0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -143,6 +143,22 @@ SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessage
return producer_impl_->send(msgs, mq, timeout);
}
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+ producer_impl_->send(msgs, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+ producer_impl_->send(msgs, sendCallback, timeout);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+ producer_impl_->send(msgs, mq, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+ producer_impl_->send(msgs, mq, sendCallback, timeout);
+}
+
MQMessage DefaultMQProducer::request(MQMessage& msg, long timeout) {
return producer_impl_->request(msg, timeout);
}
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index b33d052..dd686cd 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -358,6 +358,26 @@ SendResult DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMes
return send(batchMessage, mq, timeout);
}
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, sendCallback, timeout);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, mq, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, mq, sendCallback, timeout);
+}
+
MessagePtr DefaultMQProducerImpl::batch(std::vector<MQMessage>& msgs) {
if (msgs.size() < 1) {
THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index d93d5ea..f132565 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -93,12 +93,17 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
// Transaction
TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg) override;
- // Batch: power by sync send, caller will be responsible for the lifecycle of messages.
+ // Batch
SendResult send(std::vector<MQMessage>& msgs) override;
SendResult send(std::vector<MQMessage>& msgs, long timeout) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
// RPC
MQMessage request(MQMessage& msg, long timeout) override;
void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;