You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/12 10:04:03 UTC
[rocketmq-client-cpp] branch re_dev updated: feat:
fetchPublishMessageQueues for producer
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new f1ae40a feat: fetchPublishMessageQueues for producer
f1ae40a is described below
commit f1ae40ad46bd038753b724096b121bd8344fa741
Author: James Yin <yw...@hotmail.com>
AuthorDate: Tue Jan 12 18:03:18 2021 +0800
feat: fetchPublishMessageQueues for producer
---
include/DefaultMQProducer.h | 2 ++
include/MQProducer.h | 8 +++++++-
src/producer/DefaultMQProducer.cpp | 9 ++++++++-
src/producer/DefaultMQProducerImpl.cpp | 11 +++++++++++
src/producer/DefaultMQProducerImpl.h | 2 ++
5 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 5339db1..a847282 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -39,6 +39,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
void start() override;
void shutdown() override;
+ std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) override;
+
// Sync: caller will be responsible for the lifecycle of messages.
SendResult send(MQMessage& msg) override;
SendResult send(MQMessage& msg, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index c8226a5..74ea634 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -17,6 +17,7 @@
#ifndef ROCKETMQ_MQPRODUCER_H_
#define ROCKETMQ_MQPRODUCER_H_
+#include "MQMessageQueue.h"
#include "MQSelector.h"
#include "RequestCallback.h"
#include "SendCallback.h"
@@ -36,6 +37,8 @@ class ROCKETMQCLIENT_API MQProducer {
virtual void start() = 0;
virtual void shutdown() = 0;
+ virtual std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) = 0;
+
// Sync
virtual SendResult send(MQMessage& msg) = 0;
virtual SendResult send(MQMessage& msg, long timeout) = 0;
@@ -75,7 +78,10 @@ class ROCKETMQCLIENT_API MQProducer {
virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
- virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+ virtual void send(std::vector<MQMessage>& msgs,
+ const MQMessageQueue& mq,
+ SendCallback* sendCallback,
+ long timeout) = 0;
// RPC
virtual MQMessage request(MQMessage& msg, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 75deff0..9e4dedb 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -52,6 +52,10 @@ void DefaultMQProducer::shutdown() {
producer_impl_->shutdown();
}
+std::vector<MQMessageQueue> DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic) {
+ return producer_impl_->fetchPublishMessageQueues(topic);
+}
+
SendResult DefaultMQProducer::send(MQMessage& msg) {
return producer_impl_->send(msg);
}
@@ -155,7 +159,10 @@ void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue&
producer_impl_->send(msgs, mq, sendCallback);
}
-void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs,
+ const MQMessageQueue& mq,
+ SendCallback* sendCallback,
+ long timeout) {
producer_impl_->send(msgs, mq, sendCallback, timeout);
}
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index dd686cd..4a2b69c 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -18,6 +18,7 @@
#include <cassert>
#include <typeindex>
+#include "MQMessageQueue.h"
#ifndef WIN32
#include <signal.h>
@@ -40,6 +41,7 @@
#include "RequestFutureTable.h"
#include "TopicPublishInfo.hpp"
#include "TransactionMQProducer.h"
+#include "UtilAll.h"
#include "Validators.h"
#include "protocol/header/CommandHeader.h"
@@ -159,6 +161,15 @@ void DefaultMQProducerImpl::shutdown() {
}
}
+std::vector<MQMessageQueue> DefaultMQProducerImpl::fetchPublishMessageQueues(const std::string& topic) {
+ auto topicPublishInfo = client_instance_->tryToFindTopicPublishInfo(topic);
+ if (topicPublishInfo != nullptr) {
+ return topicPublishInfo->getMessageQueueList();
+ } else {
+ return std::vector<MQMessageQueue>{};
+ }
+}
+
SendResult DefaultMQProducerImpl::send(MQMessage& msg) {
return send(msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_msg_timeout());
}
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index f132565..da96df5 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -63,6 +63,8 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
void start() override;
void shutdown() override;
+ std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) override;
+
// Sync: caller will be responsible for the lifecycle of messages.
SendResult send(MQMessage& msg) override;
SendResult send(MQMessage& msg, long timeout) override;