You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/01/12 10:04:03 UTC

[rocketmq-client-cpp] branch re_dev updated: feat: fetchPublishMessageQueues for producer

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/re_dev by this push:
     new f1ae40a  feat: fetchPublishMessageQueues for producer
f1ae40a is described below

commit f1ae40ad46bd038753b724096b121bd8344fa741
Author: James Yin <yw...@hotmail.com>
AuthorDate: Tue Jan 12 18:03:18 2021 +0800

    feat: fetchPublishMessageQueues for producer
---
 include/DefaultMQProducer.h            |  2 ++
 include/MQProducer.h                   |  8 +++++++-
 src/producer/DefaultMQProducer.cpp     |  9 ++++++++-
 src/producer/DefaultMQProducerImpl.cpp | 11 +++++++++++
 src/producer/DefaultMQProducerImpl.h   |  2 ++
 5 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 5339db1..a847282 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -39,6 +39,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public DefaultMQProducerConfigProxy
   void start() override;
   void shutdown() override;
 
+  std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) override;
+
   // Sync: caller will be responsible for the lifecycle of messages.
   SendResult send(MQMessage& msg) override;
   SendResult send(MQMessage& msg, long timeout) override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index c8226a5..74ea634 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -17,6 +17,7 @@
 #ifndef ROCKETMQ_MQPRODUCER_H_
 #define ROCKETMQ_MQPRODUCER_H_
 
+#include "MQMessageQueue.h"
 #include "MQSelector.h"
 #include "RequestCallback.h"
 #include "SendCallback.h"
@@ -36,6 +37,8 @@ class ROCKETMQCLIENT_API MQProducer {
   virtual void start() = 0;
   virtual void shutdown() = 0;
 
+  virtual std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) = 0;
+
   // Sync
   virtual SendResult send(MQMessage& msg) = 0;
   virtual SendResult send(MQMessage& msg, long timeout) = 0;
@@ -75,7 +78,10 @@ class ROCKETMQCLIENT_API MQProducer {
   virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback) = 0;
   virtual void send(std::vector<MQMessage>& msgs, SendCallback* sendCallback, long timeout) = 0;
   virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback) = 0;
-  virtual void send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) = 0;
+  virtual void send(std::vector<MQMessage>& msgs,
+                    const MQMessageQueue& mq,
+                    SendCallback* sendCallback,
+                    long timeout) = 0;
 
   // RPC
   virtual MQMessage request(MQMessage& msg, long timeout) = 0;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 75deff0..9e4dedb 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -52,6 +52,10 @@ void DefaultMQProducer::shutdown() {
   producer_impl_->shutdown();
 }
 
+std::vector<MQMessageQueue> DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic) {
+  return producer_impl_->fetchPublishMessageQueues(topic);
+}
+
 SendResult DefaultMQProducer::send(MQMessage& msg) {
   return producer_impl_->send(msg);
 }
@@ -155,7 +159,10 @@ void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue&
   producer_impl_->send(msgs, mq, sendCallback);
 }
 
-void DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+void DefaultMQProducer::send(std::vector<MQMessage>& msgs,
+                             const MQMessageQueue& mq,
+                             SendCallback* sendCallback,
+                             long timeout) {
   producer_impl_->send(msgs, mq, sendCallback, timeout);
 }
 
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index dd686cd..4a2b69c 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -18,6 +18,7 @@
 
 #include <cassert>
 #include <typeindex>
+#include "MQMessageQueue.h"
 
 #ifndef WIN32
 #include <signal.h>
@@ -40,6 +41,7 @@
 #include "RequestFutureTable.h"
 #include "TopicPublishInfo.hpp"
 #include "TransactionMQProducer.h"
+#include "UtilAll.h"
 #include "Validators.h"
 #include "protocol/header/CommandHeader.h"
 
@@ -159,6 +161,15 @@ void DefaultMQProducerImpl::shutdown() {
   }
 }
 
+std::vector<MQMessageQueue> DefaultMQProducerImpl::fetchPublishMessageQueues(const std::string& topic) {
+  auto topicPublishInfo = client_instance_->tryToFindTopicPublishInfo(topic);
+  if (topicPublishInfo != nullptr) {
+    return topicPublishInfo->getMessageQueueList();
+  } else {
+    return std::vector<MQMessageQueue>{};
+  }
+}
+
 SendResult DefaultMQProducerImpl::send(MQMessage& msg) {
   return send(msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_msg_timeout());
 }
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index f132565..da96df5 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -63,6 +63,8 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
   void start() override;
   void shutdown() override;
 
+  std::vector<MQMessageQueue> fetchPublishMessageQueues(const std::string& topic) override;
+
   // Sync: caller will be responsible for the lifecycle of messages.
   SendResult send(MQMessage& msg) override;
   SendResult send(MQMessage& msg, long timeout) override;