You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/08 10:36:16 UTC

[rocketmq-client-cpp] 02/02: feat: support async send for batch message

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 1c868a720e78b8af43df28618d5ff997c7cd6e17
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:40 2021 +0800

    feat: support async send for batch message
---
 include/DefaultMQProducer.h            |  5 +++++
 include/MQProducer.h                   |  5 +++++
 src/producer/DefaultMQProducer.cpp     | 16 ++++++++++++++++
 src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
 src/producer/DefaultMQProducerImpl.h   |  7 ++++++-
 5 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index ec5e5df..5339db1 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -75,6 +75,11 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
 
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
   // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
   void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index 6fc281e..c8226a5 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -72,6 +72,11 @@ class ROCKETMQCLIENT_API MQProducer {
   virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
   virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) = 0;
 
+  virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+
   // RPC
   virtual MQMessage request(MQMessage& msg, long timeout) = 0;
   virtual void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 89fd57a..75deff0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -143,6 +143,22 @@ SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessage
   return producer_impl_->send(msgs, mq, timeout);
 }
 
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+  producer_impl_->send(msgs, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+  producer_impl_->send(msgs, sendCallback, timeout);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+  producer_impl_->send(msgs, mq, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+  producer_impl_->send(msgs, mq, sendCallback, timeout);
+}
+
 MQMessage DefaultMQProducer::request(MQMessage& msg, long timeout) {
   return producer_impl_->request(msg, timeout);
 }
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index b33d052..dd686cd 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -358,6 +358,26 @@ SendResult DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMes
   return send(batchMessage, mq, timeout);
 }
 
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, sendCallback, timeout);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, mq, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, mq, sendCallback, timeout);
+}
+
 MessagePtr DefaultMQProducerImpl::batch(std::vector<MQMessage>& msgs) {
   if (msgs.size() < 1) {
     THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index d93d5ea..f132565 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -93,12 +93,17 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
   // Transaction
   TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg) override;
 
-  // Batch: power by sync send, caller will be responsible for the lifecycle of messages.
+  // Batch
   SendResult send(std::vector<MQMessage>& msgs) override;
   SendResult send(std::vector<MQMessage>& msgs, long timeout) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
 
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
   // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
   void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;