You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/06 06:21:01 UTC

[pulsar] branch master updated: add listenerName support to client-cpp (#9119)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d7feb3  add listenerName support to client-cpp (#9119)
7d7feb3 is described below

commit 7d7feb36068e2764c6d691e551a4caa53fefb33d
Author: z2665 <z2...@hotmail.com>
AuthorDate: Wed Jan 6 14:20:37 2021 +0800

    add listenerName support to client-cpp (#9119)
    
    ### Motivation
    
    since pulsar 2.6.1 java client support the listenerName to support access pular server with advertisedListeners
    but client-cpp not support
    
    ### Modifications
    
    1. add setListenerName/getListenerName to ClientConfiguration
    2. add ListenerName to TopicLookup in  BinaryProtoLookupService
---
 pulsar-client-cpp/include/pulsar/ClientConfiguration.h |  3 +++
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc      | 17 +++++++++++++----
 pulsar-client-cpp/lib/BinaryProtoLookupService.h       |  7 ++++++-
 pulsar-client-cpp/lib/ClientConfiguration.cc           |  7 +++++++
 pulsar-client-cpp/lib/ClientConfigurationImpl.h        |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc              |  5 +++--
 pulsar-client-cpp/lib/ClientConnection.h               |  4 ++--
 pulsar-client-cpp/lib/ClientImpl.cc                    |  3 ++-
 pulsar-client-cpp/lib/Commands.cc                      |  4 +++-
 pulsar-client-cpp/lib/Commands.h                       |  3 ++-
 10 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 9bb63d4..9a5ecb7 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -141,6 +141,9 @@ class PULSAR_PUBLIC ClientConfiguration {
     ClientConfiguration& setValidateHostName(bool validateHostName);
     bool isValidateHostName() const;
 
+    ClientConfiguration& setListenerName(const std::string& listenerName);
+    const std::string& getListenerName() const;
+
     /*
      * Initialize stats interval in seconds. Stats are printed and reset after every 'statsIntervalInSeconds'.
      * Set to 0 in order to disable stats collection.
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 160efc9..069e0e2 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -36,6 +36,14 @@ namespace pulsar {
 BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl)
     : cnxPool_(cnxPool), serviceUrl_(lookupUrl), mutex_(), requestIdGenerator_(0) {}
 
+BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl,
+                                                   const std::string& listenerName)
+    : cnxPool_(cnxPool),
+      serviceUrl_(lookupUrl),
+      listenerName_(listenerName),
+      mutex_(),
+      requestIdGenerator_(0) {}
+
 /*
  * @param topicName topic name to get broker for
  *
@@ -53,7 +61,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(const
     LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
     Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
     future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
-                                 std::placeholders::_1, std::placeholders::_2, promise));
+                                 listenerName_, std::placeholders::_1, std::placeholders::_2, promise));
     return promise->getFuture();
 }
 
@@ -76,7 +84,8 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
 }
 
 void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicName, bool authoritative,
-                                                      Result result, const ClientConnectionWeakPtr& clientCnx,
+                                                      const std::string& listenerName, Result result,
+                                                      const ClientConnectionWeakPtr& clientCnx,
                                                       LookupDataResultPromisePtr promise) {
     if (result != ResultOk) {
         promise->setFailed(ResultConnectError);
@@ -85,7 +94,7 @@ void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicNa
     LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
     ClientConnectionPtr conn = clientCnx.lock();
     uint64_t requestId = newRequestId();
-    conn->newTopicLookup(topicName, authoritative, requestId, lookupPromise);
+    conn->newTopicLookup(topicName, authoritative, listenerName, requestId, lookupPromise);
     lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handleLookup, this, topicName,
                                                      std::placeholders::_1, std::placeholders::_2, clientCnx,
                                                      promise));
@@ -105,7 +114,7 @@ void BinaryProtoLookupService::handleLookup(const std::string& topicName, Result
             Future<Result, ClientConnectionWeakPtr> future =
                 cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
             future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, topicName,
-                                         data->isAuthoritative(), std::placeholders::_1,
+                                         data->isAuthoritative(), listenerName_, std::placeholders::_1,
                                          std::placeholders::_2, promise));
         } else {
             LOG_DEBUG("Lookup response for " << topicName << ", lookup-broker-url " << data->getBrokerUrl());
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index 4a6fba2..c4cf0d0 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -37,6 +37,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
      */
     BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl);
 
+    BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl,
+                             const std::string& listenerName);
+
     Future<Result, LookupDataResultPtr> lookupAsync(const std::string& topicName);
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName);
@@ -49,8 +52,10 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
 
     std::string serviceUrl_;
     ConnectionPool& cnxPool_;
+    std::string listenerName_;
 
-    void sendTopicLookupRequest(const std::string& topicName, bool authoritative, Result result,
+    void sendTopicLookupRequest(const std::string& topicName, bool authoritative,
+                                const std::string& listenerName, Result result,
                                 const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);
 
     void handleLookup(const std::string& topicName, Result result, LookupDataResultPtr data,
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index ad210f6..a70c22e 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -126,4 +126,11 @@ ClientConfiguration& ClientConfiguration::setPartititionsUpdateInterval(unsigned
 unsigned int ClientConfiguration::getPartitionsUpdateInterval() const {
     return impl_->partitionsUpdateInterval;
 }
+
+ClientConfiguration& ClientConfiguration::setListenerName(const std::string& listenerName) {
+    impl_->listenerName = listenerName;
+    return *this;
+}
+
+const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; }
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 11cd8c9..4dc4c23 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -37,6 +37,7 @@ struct ClientConfigurationImpl {
     std::unique_ptr<LoggerFactory> loggerFactory;
     bool validateHostName;
     unsigned int partitionsUpdateInterval;
+    std::string listenerName;
 
     ClientConfigurationImpl()
         : authenticationPtr(AuthFactory::Disabled()),
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d2fb659..b4d1bd1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1186,8 +1186,9 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
 }
 
 void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
-                                      const uint64_t requestId, LookupDataResultPromisePtr promise) {
-    newLookup(Commands::newLookup(topicName, authoritative, requestId), requestId, promise);
+                                      const std::string& listenerName, const uint64_t requestId,
+                                      LookupDataResultPromisePtr promise) {
+    newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
 }
 
 void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 99d9d45..71db1ad 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -120,8 +120,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
 
     Future<Result, ClientConnectionWeakPtr> getCloseFuture();
 
-    void newTopicLookup(const std::string& topicName, bool authoritative, const uint64_t requestId,
-                        LookupDataResultPromisePtr promise);
+    void newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName,
+                        const uint64_t requestId, LookupDataResultPromisePtr promise);
 
     void newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
                                       LookupDataResultPromisePtr promise);
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 83af939..af144ab 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -116,7 +116,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
                                                 std::cref(clientConfiguration_.getAuthPtr()));
     } else {
         LOG_DEBUG("Using Binary Lookup");
-        lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(std::ref(pool_), std::ref(serviceUrl));
+        lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(
+            std::ref(pool_), std::ref(serviceUrl), std::cref(clientConfiguration_.getListenerName()));
     }
 }
 
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 4df47c8..009c19e 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -108,7 +108,8 @@ SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uin
     return buffer;
 }
 
-SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId) {
+SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
+                                 const std::string& listenerName) {
     static BaseCommand cmd;
     static std::mutex mutex;
     std::lock_guard<std::mutex> lock(mutex);
@@ -117,6 +118,7 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
     lookup->set_topic(topic);
     lookup->set_authoritative(authoritative);
     lookup->set_request_id(requestId);
+    lookup->set_advertised_listener_name(listenerName);
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_lookuptopic();
     return buffer;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 18f0049..abcb44b 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -76,7 +76,8 @@ class Commands {
 
     static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
 
-    static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId);
+    static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
+                                  const std::string& listenerName);
 
     static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
                                     uint64_t sequenceId, ChecksumType checksumType, const Message& msg);