You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/08 10:36:14 UTC

[rocketmq-client-cpp] branch re_dev updated (8402eb0 -> 1c868a7)

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a change to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git.


    from 8402eb0  chore: update for clangd
     new 2cf4836  fix: header of std::bad_alloc
     new 1c868a7  feat: support async send for batch message

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/Array.h                        |  2 +-
 include/DefaultMQProducer.h            |  5 +++++
 include/MQProducer.h                   |  5 +++++
 src/producer/DefaultMQProducer.cpp     | 16 ++++++++++++++++
 src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
 src/producer/DefaultMQProducerImpl.h   |  7 ++++++-
 6 files changed, 53 insertions(+), 2 deletions(-)


[rocketmq-client-cpp] 01/02: fix: header of std::bad_alloc

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 2cf4836defad2e17d69a7e149ea2cb7598a8e38d
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:01 2021 +0800

    fix: header of std::bad_alloc
---
 include/Array.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/include/Array.h b/include/Array.h
index c1059cf..20fd7b3 100644
--- a/include/Array.h
+++ b/include/Array.h
@@ -20,7 +20,7 @@
 #include <cstdlib>  // std::calloc, std::free
 #include <cstring>  // std::memcpy
 
-#include <stdexcept>    // std::bad_alloc
+#include <new>          // std::bad_alloc
 #include <type_traits>  // std::enable_if, std::is_arithmetic, std::is_pointer, std::is_class
 
 #include "RocketMQClient.h"


[rocketmq-client-cpp] 02/02: feat: support async send for batch message

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 1c868a720e78b8af43df28618d5ff997c7cd6e17
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:40 2021 +0800

    feat: support async send for batch message
---
 include/DefaultMQProducer.h            |  5 +++++
 include/MQProducer.h                   |  5 +++++
 src/producer/DefaultMQProducer.cpp     | 16 ++++++++++++++++
 src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
 src/producer/DefaultMQProducerImpl.h   |  7 ++++++-
 5 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index ec5e5df..5339db1 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -75,6 +75,11 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
 
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
   // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
   void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index 6fc281e..c8226a5 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -72,6 +72,11 @@ class ROCKETMQCLIENT_API MQProducer {
   virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
   virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) = 0;
 
+  virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
+  virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+
   // RPC
   virtual MQMessage request(MQMessage& msg, long timeout) = 0;
   virtual void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 89fd57a..75deff0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -143,6 +143,22 @@ SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessage
   return producer_impl_->send(msgs, mq, timeout);
 }
 
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+  producer_impl_->send(msgs, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+  producer_impl_->send(msgs, sendCallback, timeout);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+  producer_impl_->send(msgs, mq, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+  producer_impl_->send(msgs, mq, sendCallback, timeout);
+}
+
 MQMessage DefaultMQProducer::request(MQMessage& msg, long timeout) {
   return producer_impl_->request(msg, timeout);
 }
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index b33d052..dd686cd 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -358,6 +358,26 @@ SendResult DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMes
   return send(batchMessage, mq, timeout);
 }
 
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, sendCallback, timeout);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, mq, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+  MQMessage batchMessage(batch(msgs));
+  send(batchMessage, mq, sendCallback, timeout);
+}
+
 MessagePtr DefaultMQProducerImpl::batch(std::vector<MQMessage>& msgs) {
   if (msgs.size() < 1) {
     THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index d93d5ea..f132565 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -93,12 +93,17 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
   // Transaction
   TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg) override;
 
-  // Batch: power by sync send, caller will be responsible for the lifecycle of messages.
+  // Batch
   SendResult send(std::vector<MQMessage>& msgs) override;
   SendResult send(std::vector<MQMessage>& msgs, long timeout) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
 
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+  void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
   // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
   void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;