You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/08 10:36:14 UTC
[rocketmq-client-cpp] branch re_dev updated (8402eb0 -> 1c868a7)
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a change to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git.
from 8402eb0 chore: update for clangd
new 2cf4836 fix: header of std::bad_alloc
new 1c868a7 feat: support async send for batch message
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
include/Array.h | 2 +-
include/DefaultMQProducer.h | 5 +++++
include/MQProducer.h | 5 +++++
src/producer/DefaultMQProducer.cpp | 16 ++++++++++++++++
src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
src/producer/DefaultMQProducerImpl.h | 7 ++++++-
6 files changed, 53 insertions(+), 2 deletions(-)
[rocketmq-client-cpp] 01/02: fix: header of std::bad_alloc
Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 2cf4836defad2e17d69a7e149ea2cb7598a8e38d
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:01 2021 +0800
fix: header of std::bad_alloc
---
include/Array.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/include/Array.h b/include/Array.h
index c1059cf..20fd7b3 100644
--- a/include/Array.h
+++ b/include/Array.h
@@ -20,7 +20,7 @@
#include <cstdlib> // std::calloc, std::free
#include <cstring> // std::memcpy
-#include <stdexcept> // std::bad_alloc
+#include <new> // std::bad_alloc
#include <type_traits> // std::enable_if, std::is_arithmetic, std::is_pointer, std::is_class
#include "RocketMQClient.h"
[rocketmq-client-cpp] 02/02: feat: support async send for batch
message
Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 1c868a720e78b8af43df28618d5ff997c7cd6e17
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Jan 8 18:35:40 2021 +0800
feat: support async send for batch message
---
include/DefaultMQProducer.h | 5 +++++
include/MQProducer.h | 5 +++++
src/producer/DefaultMQProducer.cpp | 16 ++++++++++++++++
src/producer/DefaultMQProducerImpl.cpp | 20 ++++++++++++++++++++
src/producer/DefaultMQProducerImpl.h | 7 ++++++-
5 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index ec5e5df..5339db1 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -75,6 +75,11 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
// RPC
MQMessage request(MQMessage& msg, long timeout) override;
void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index 6fc281e..c8226a5 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -72,6 +72,11 @@ class ROCKETMQCLIENT_API MQProducer {
virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
+ virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+
// RPC
virtual MQMessage request(MQMessage& msg, long timeout) = 0;
virtual void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 89fd57a..75deff0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -143,6 +143,22 @@ SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessage
return producer_impl_->send(msgs, mq, timeout);
}
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+ producer_impl_->send(msgs, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+ producer_impl_->send(msgs, sendCallback, timeout);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+ producer_impl_->send(msgs, mq, sendCallback);
+}
+
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+ producer_impl_->send(msgs, mq, sendCallback, timeout);
+}
+
MQMessage DefaultMQProducer::request(MQMessage& msg, long timeout) {
return producer_impl_->request(msg, timeout);
}
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index b33d052..dd686cd 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -358,6 +358,26 @@ SendResult DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMes
return send(batchMessage, mq, timeout);
}
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, sendCallback, timeout);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, mq, sendCallback);
+}
+
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+ MQMessage batchMessage(batch(msgs));
+ send(batchMessage, mq, sendCallback, timeout);
+}
+
MessagePtr DefaultMQProducerImpl::batch(std::vector<MQMessage>& msgs) {
if (msgs.size() < 1) {
THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index d93d5ea..f132565 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -93,12 +93,17 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
// Transaction
TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg) override;
- // Batch: power by sync send, caller will be responsible for the lifecycle of messages.
+ // Batch
SendResult send(std::vector<MQMessage>& msgs) override;
SendResult send(std::vector<MQMessage>& msgs, long timeout) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) override;
+ void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) override;
+
// RPC
MQMessage request(MQMessage& msg, long timeout) override;
void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;