You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/30 15:14:14 UTC

[pulsar] branch master updated: [feat][cpp] Support multiple brokers in service URL (#17162)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 786ab3078ff [feat][cpp] Support multiple brokers in service URL (#17162)
786ab3078ff is described below

commit 786ab3078ffd1874cc7f48cc1a4a48acfda3410a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Aug 30 23:14:07 2022 +0800

    [feat][cpp] Support multiple brokers in service URL (#17162)
    
    ### Motivation
    
    It's a catchup for https://github.com/apache/pulsar/pull/3249.
    
    Currently C++ client doesn't have strict validation on the service URL.
    If multiple brokers are specified, only the 1st broker will be used.
    
    ### Modifications
    
    - Add `ServiceURI` to support configuring multiple brokers in the
      service URL. See `ServiceURITest` for how to configure it.
    - Add `ServiceNameResolver` whose `resolveHost` method selects the
      next broker in a round robin way.
    - Since the broker's address for topic lookup is no longer the service
      URL itself, to handle the case when proxy is enabled, a `getBroker`
      method (like the same method in Java's `LookupService`), which returns
      the future of a pair of logical and physical addresses, is added to
      replace `lookupAsync`.
    - Apply `ServiceNameResolver` into `ClientImpl` and the
      `LookupService` implementations.
    - Rename `BinaryLookupServiceTest` to `LookupServiceTest` and add
      `testMultiAddresses` to test both `BinaryProtoLookupService` and
      `HTTPLookupService` that all available brokers will be accessed in a
      round robin way.
    
    ### TODO
    
    This is the 1st part to support multiple brokers. Even with this patch,
    if one of these hosts is not available, the topic lookup will fail
    immediately instead of trying other broker. Since this patch already
    includes many code changes, I will push another patch to fix it.
---
 pulsar-client-cpp/include/pulsar/Client.h          |   2 +
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc  | 138 +++++++++------------
 pulsar-client-cpp/lib/BinaryProtoLookupService.h   |  29 ++---
 pulsar-client-cpp/lib/ClientImpl.cc                |  80 +++++-------
 pulsar-client-cpp/lib/ClientImpl.h                 |   7 +-
 pulsar-client-cpp/lib/ConnectionPool.h             |   4 +
 pulsar-client-cpp/lib/HTTPLookupService.cc         |  71 +++++------
 pulsar-client-cpp/lib/HTTPLookupService.h          |  11 +-
 pulsar-client-cpp/lib/LookupService.h              |  23 +++-
 pulsar-client-cpp/lib/PulsarScheme.h               |  82 ++++++++++++
 pulsar-client-cpp/lib/ServiceNameResolver.h        |  57 +++++++++
 pulsar-client-cpp/lib/ServiceURI.cc                | 101 +++++++++++++++
 pulsar-client-cpp/lib/ServiceURI.h                 |  50 ++++++++
 pulsar-client-cpp/lib/TopicName.cc                 |  12 +-
 pulsar-client-cpp/lib/TopicName.h                  |  12 +-
 pulsar-client-cpp/python/pulsar_test.py            |   6 +-
 pulsar-client-cpp/run-unit-tests.sh                |   4 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  28 +----
 ...ryLookupServiceTest.cc => LookupServiceTest.cc} |  78 ++++++++++--
 pulsar-client-cpp/tests/ServiceURITest.cc          |  75 +++++++++++
 20 files changed, 631 insertions(+), 239 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index 49dd011963b..3edb03b5bad 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -50,6 +50,7 @@ class PULSAR_PUBLIC Client {
      * configuration.
      *
      * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650)
+     * @throw std::invalid_argument if `serviceUrl` is invalid
      */
     Client(const std::string& serviceUrl);
 
@@ -60,6 +61,7 @@ class PULSAR_PUBLIC Client {
      * @param serviceUrl the Pulsar endpoint to use (eg:
      * http://brokerv2-pdev.messaging.corp.gq1.yahoo.com:4080 for Sandbox access)
      * @param clientConfiguration the client configuration to use
+     * @throw std::invalid_argument if `serviceUrl` is invalid
      */
     Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
 
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 716ff91c58a..bff29237c71 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -29,35 +29,61 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
-/*
- * @param lookupUrl service url to do lookup
- * Constructor
- */
-BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl)
-    : serviceUrl_(lookupUrl), cnxPool_(cnxPool) {}
-
-BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl,
-                                                   const std::string& listenerName)
-    : serviceUrl_(lookupUrl), listenerName_(listenerName), cnxPool_(cnxPool) {}
+auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture {
+    return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString());
+}
 
-/*
- * @param topicName topic name to get broker for
- *
- * Looks up the owner broker for the given topic name
- */
-Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(const std::string& topic) {
-    TopicNamePtr topicName = TopicName::get(topic);
-    if (!topicName) {
-        LOG_ERROR("Unable to parse topic - " << topic);
-        LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
-        promise->setFailed(ResultInvalidTopicName);
-        return promise->getFuture();
-    }
-    std::string lookupName = topicName->toString();
-    LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
-    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
-    future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
-                                 listenerName_, std::placeholders::_1, std::placeholders::_2, promise));
+auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
+                                          const std::string& topic) -> LookupResultFuture {
+    LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic);
+    auto promise = std::make_shared<Promise<Result, LookupResult>>();
+    // NOTE: we can use move capture for topic since C++14
+    cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address](
+                                                         Result result,
+                                                         const ClientConnectionWeakPtr& weakCnx) {
+        if (result != ResultOk) {
+            promise->setFailed(result);
+            return;
+        }
+        auto cnx = weakCnx.lock();
+        if (!cnx) {
+            LOG_ERROR("Connection to " << address << " is expired before lookup");
+            promise->setFailed(ResultNotConnected);
+            return;
+        }
+        auto lookupPromise = std::make_shared<LookupDataResultPromise>();
+        cnx->newTopicLookup(topic, false, listenerName_, newRequestId(), lookupPromise);
+        lookupPromise->getFuture().addListener([this, cnx, promise, topic, address](
+                                                   Result result, const LookupDataResultPtr& data) {
+            if (result != ResultOk || !data) {
+                LOG_ERROR("Lookup failed for " << topic << ", result " << result);
+                promise->setFailed(result);
+                return;
+            }
+
+            const auto responseBrokerAddress =
+                (serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
+            if (data->isRedirect()) {
+                LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
+                findBroker(responseBrokerAddress, data->isAuthoritative(), topic)
+                    .addListener([promise](Result result, const LookupResult& value) {
+                        if (result == ResultOk) {
+                            promise->setValue(value);
+                        } else {
+                            promise->setFailed(result);
+                        }
+                    });
+            } else {
+                LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl());
+                if (data->shouldProxyThroughServiceUrl()) {
+                    // logicalAddress is the proxy's address, we should still connect through proxy
+                    promise->setValue({responseBrokerAddress, address});
+                } else {
+                    promise->setValue({responseBrokerAddress, responseBrokerAddress});
+                }
+            }
+        });
+    });
     return promise->getFuture();
 }
 
@@ -73,55 +99,13 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
         return promise->getFuture();
     }
     std::string lookupName = topicName->toString();
-    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
-    future.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
-                                 lookupName, std::placeholders::_1, std::placeholders::_2, promise));
+    const auto address = serviceNameResolver_.resolveHost();
+    cnxPool_.getConnectionAsync(address, address)
+        .addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
+                               lookupName, std::placeholders::_1, std::placeholders::_2, promise));
     return promise->getFuture();
 }
 
-void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicName, bool authoritative,
-                                                      const std::string& listenerName, Result result,
-                                                      const ClientConnectionWeakPtr& clientCnx,
-                                                      LookupDataResultPromisePtr promise) {
-    if (result != ResultOk) {
-        promise->setFailed(ResultConnectError);
-        return;
-    }
-    LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
-    ClientConnectionPtr conn = clientCnx.lock();
-    uint64_t requestId = newRequestId();
-    conn->newTopicLookup(topicName, authoritative, listenerName, requestId, lookupPromise);
-    lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handleLookup, this, topicName,
-                                                     std::placeholders::_1, std::placeholders::_2, clientCnx,
-                                                     promise));
-}
-
-void BinaryProtoLookupService::handleLookup(const std::string& topicName, Result result,
-                                            LookupDataResultPtr data,
-                                            const ClientConnectionWeakPtr& clientCnx,
-                                            LookupDataResultPromisePtr promise) {
-    if (data) {
-        if (data->isRedirect()) {
-            LOG_DEBUG("Lookup request is for " << topicName << " redirected to " << data->getBrokerUrl());
-
-            const std::string& logicalAddress = data->getBrokerUrl();
-            const std::string& physicalAddress =
-                data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
-            Future<Result, ClientConnectionWeakPtr> future =
-                cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
-            future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, topicName,
-                                         data->isAuthoritative(), listenerName_, std::placeholders::_1,
-                                         std::placeholders::_2, promise));
-        } else {
-            LOG_DEBUG("Lookup response for " << topicName << ", lookup-broker-url " << data->getBrokerUrl());
-            promise->setValue(data);
-        }
-    } else {
-        LOG_DEBUG("Lookup failed for " << topicName << ", result " << result);
-        promise->setFailed(result);
-    }
-}
-
 void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
                                                                   const ClientConnectionWeakPtr& clientCnx,
                                                                   LookupDataResultPromisePtr promise) {
@@ -166,9 +150,9 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
         return promise->getFuture();
     }
     std::string namespaceName = nsName->toString();
-    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
-    future.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
-                                 namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
+    cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
+        .addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
+                               namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
     return promise->getFuture();
 }
 
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index bbf76325956..d068c3d0e64 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -26,40 +26,33 @@
 #include "Backoff.h"
 #include <lib/LookupService.h>
 #include <mutex>
+#include "ServiceNameResolver.h"
 
 namespace pulsar {
 class LookupDataResult;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
-    /*
-     * constructor
-     */
-    BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl);
+    BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
+                             const std::string& listenerName)
+        : serviceNameResolver_(serviceNameResolver), cnxPool_(pool), listenerName_(listenerName) {}
 
-    BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl,
-                             const std::string& listenerName);
+    LookupResultFuture getBroker(const TopicName& topicName) override;
 
-    Future<Result, LookupDataResultPtr> lookupAsync(const std::string& topicName);
+    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName);
-
-    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
 
    private:
     std::mutex mutex_;
     uint64_t requestIdGenerator_ = 0;
 
-    std::string serviceUrl_;
-    std::string listenerName_;
+    ServiceNameResolver& serviceNameResolver_;
     ConnectionPool& cnxPool_;
+    std::string listenerName_;
 
-    void sendTopicLookupRequest(const std::string& topicName, bool authoritative,
-                                const std::string& listenerName, Result result,
-                                const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);
-
-    void handleLookup(const std::string& topicName, Result result, LookupDataResultPtr data,
-                      const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);
+    // TODO: limit the redirect count, see https://github.com/apache/pulsar/pull/7096
+    LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic);
 
     void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
                                             const ClientConnectionWeakPtr& clientCnx,
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index d15e247347a..c68d6a951b4 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -73,25 +73,12 @@ typedef std::unique_lock<std::mutex> Lock;
 
 typedef std::vector<std::string> StringList;
 
-static const std::string https("https");
-static const std::string pulsarSsl("pulsar+ssl");
-
-static const ClientConfiguration detectTls(const std::string& serviceUrl,
-                                           const ClientConfiguration& clientConfiguration) {
-    ClientConfiguration conf(clientConfiguration);
-    if (serviceUrl.compare(0, https.size(), https) == 0 ||
-        serviceUrl.compare(0, pulsarSsl.size(), pulsarSsl) == 0) {
-        conf.setUseTls(true);
-    }
-    return conf;
-}
-
 ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
                        bool poolConnections)
     : mutex_(),
       state_(Open),
-      serviceUrl_(serviceUrl),
-      clientConfiguration_(detectTls(serviceUrl, clientConfiguration)),
+      serviceNameResolver_(serviceUrl),
+      clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
       memoryLimitController_(clientConfiguration.getMemoryLimit()),
       ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
       listenerExecutorProvider_(
@@ -120,15 +107,16 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
     }
     LogUtils::setLoggerFactory(std::move(loggerFactory));
 
-    if (serviceUrl_.compare(0, 4, "http") == 0) {
+    if (serviceNameResolver_.useHttp()) {
         LOG_DEBUG("Using HTTP Lookup");
-        lookupServicePtr_ =
-            std::make_shared<HTTPLookupService>(std::cref(serviceUrl_), std::cref(clientConfiguration_),
-                                                std::cref(clientConfiguration_.getAuthPtr()));
+        lookupServicePtr_ = std::make_shared<HTTPLookupService>(std::ref(serviceNameResolver_),
+                                                                std::cref(clientConfiguration_),
+                                                                std::cref(clientConfiguration_.getAuthPtr()));
     } else {
         LOG_DEBUG("Using Binary Lookup");
-        lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(
-            std::ref(pool_), std::ref(serviceUrl), std::cref(clientConfiguration_.getListenerName()));
+        lookupServicePtr_ =
+            std::make_shared<BinaryProtoLookupService>(std::ref(serviceNameResolver_), std::ref(pool_),
+                                                       std::cref(clientConfiguration_.getListenerName()));
     }
 }
 
@@ -414,36 +402,32 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
 
 Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
     Promise<Result, ClientConnectionWeakPtr> promise;
-    lookupServicePtr_->lookupAsync(topic).addListener(std::bind(&ClientImpl::handleLookup, shared_from_this(),
-                                                                std::placeholders::_1, std::placeholders::_2,
-                                                                promise));
-    return promise.getFuture();
-}
 
-void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
-                              Promise<Result, ClientConnectionWeakPtr> promise) {
-    if (data) {
-        const std::string& logicalAddress =
-            clientConfiguration_.isUseTls() ? data->getBrokerUrlTls() : data->getBrokerUrl();
-        LOG_DEBUG("Getting connection to broker: " << logicalAddress);
-        const std::string& physicalAddress =
-            data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
-        Future<Result, ClientConnectionWeakPtr> future =
-            pool_.getConnectionAsync(logicalAddress, physicalAddress);
-        future.addListener(std::bind(&ClientImpl::handleNewConnection, shared_from_this(),
-                                     std::placeholders::_1, std::placeholders::_2, promise));
-    } else {
-        promise.setFailed(result);
+    const auto topicNamePtr = TopicName::get(topic);
+    if (!topicNamePtr) {
+        LOG_ERROR("Unable to parse topic - " << topic);
+        promise.setFailed(ResultInvalidTopicName);
+        return promise.getFuture();
     }
-}
 
-void ClientImpl::handleNewConnection(Result result, const ClientConnectionWeakPtr& conn,
-                                     Promise<Result, ClientConnectionWeakPtr> promise) {
-    if (result == ResultOk) {
-        promise.setValue(conn);
-    } else {
-        promise.setFailed(ResultConnectError);
-    }
+    auto self = shared_from_this();
+    lookupServicePtr_->getBroker(*topicNamePtr)
+        .addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
+            if (result != ResultOk) {
+                promise.setFailed(result);
+                return;
+            }
+            pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
+                .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
+                    if (result == ResultOk) {
+                        promise.setValue(weakCnx);
+                    } else {
+                        promise.setFailed(result);
+                    }
+                });
+        });
+
+    return promise.getFuture();
 }
 
 void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 847872abfd2..1f046e2427c 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -31,6 +31,7 @@
 #include "ConsumerImplBase.h"
 #include <atomic>
 #include <vector>
+#include "ServiceNameResolver.h"
 
 namespace pulsar {
 
@@ -69,10 +70,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
 
     Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);
-    void handleLookup(Result result, LookupDataResultPtr data,
-                      Promise<Result, ClientConnectionWeakPtr> promise);
-    void handleNewConnection(Result result, const ClientConnectionWeakPtr& conn,
-                             Promise<Result, ClientConnectionWeakPtr> promise);
 
     void closeAsync(CloseCallback callback);
     void shutdown();
@@ -134,7 +131,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     std::mutex mutex_;
 
     State state_;
-    std::string serviceUrl_;
+    ServiceNameResolver serviceNameResolver_;
     ClientConfiguration clientConfiguration_;
     MemoryLimitController memoryLimitController_;
 
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 5032e809d55..21d439e7326 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -63,6 +63,10 @@ class PULSAR_PUBLIC ConnectionPool {
     Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
                                                                const std::string& physicalAddress);
 
+    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
+        return getConnectionAsync(address, address);
+    }
+
    private:
     ClientConfiguration clientConfiguration_;
     ExecutorServiceProviderPtr executorProvider_;
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 340f67c050e..5b986632392 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -48,47 +48,47 @@ HTTPLookupService::CurlInitializer::~CurlInitializer() { curl_global_cleanup();
 
 HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
 
-HTTPLookupService::HTTPLookupService(const std::string &lookupUrl,
+HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
                                      const ClientConfiguration &clientConfiguration,
                                      const AuthenticationPtr &authData)
     : executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
+      serviceNameResolver_(serviceNameResolver),
       authenticationPtr_(authData),
       lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
       tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
       isUseTls_(clientConfiguration.isUseTls()),
       tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
-      tlsValidateHostname_(clientConfiguration.isValidateHostName()) {
-    if (lookupUrl[lookupUrl.length() - 1] == '/') {
-        // Remove trailing '/'
-        adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
-    } else {
-        adminUrl_ = lookupUrl;
-    }
-}
+      tlsValidateHostname_(clientConfiguration.isValidateHostName()) {}
 
-Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &topic) {
-    LookupPromise promise;
-    std::shared_ptr<TopicName> topicName = TopicName::get(topic);
-    if (!topicName) {
-        LOG_ERROR("Unable to parse topic - " << topic);
-        promise.setFailed(ResultInvalidTopicName);
-        return promise.getFuture();
-    }
+auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFuture {
+    LookupResultPromise promise;
 
+    const auto &url = serviceNameResolver_.resolveHost();
     std::stringstream completeUrlStream;
-    if (topicName->isV2Topic()) {
-        completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() << "/" << topicName->getProperty()
-                          << '/' << topicName->getNamespacePortion() << '/'
-                          << topicName->getEncodedLocalName();
+    if (topicName.isV2Topic()) {
+        completeUrlStream << url << V2_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/'
+                          << topicName.getNamespacePortion() << '/' << topicName.getEncodedLocalName();
     } else {
-        completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() << "/" << topicName->getProperty()
-                          << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
-                          << topicName->getEncodedLocalName();
+        completeUrlStream << url << V1_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/'
+                          << topicName.getCluster() << '/' << topicName.getNamespacePortion() << '/'
+                          << topicName.getEncodedLocalName();
     }
 
-    executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
-                                                 shared_from_this(), promise, completeUrlStream.str(),
-                                                 Lookup));
+    const auto completeUrl = completeUrlStream.str();
+    auto self = shared_from_this();
+    executorProvider_->get()->postWork([this, self, promise, completeUrl] {
+        std::string responseData;
+        Result result = sendHTTPRequest(completeUrl, responseData);
+
+        if (result != ResultOk) {
+            promise.setFailed(result);
+        } else {
+            const auto lookupDataResultPtr = parseLookupData(responseData);
+            const auto brokerAddress = (serviceNameResolver_.useTls() ? lookupDataResultPtr->getBrokerUrlTls()
+                                                                      : lookupDataResultPtr->getBrokerUrl());
+            promise.setValue({brokerAddress, brokerAddress});
+        }
+    });
     return promise.getFuture();
 }
 
@@ -97,15 +97,15 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync
     LookupPromise promise;
     std::stringstream completeUrlStream;
 
+    const auto &url = serviceNameResolver_.resolveHost();
     if (topicName->isV2Topic()) {
-        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << topicName->getDomain() << '/'
-                          << topicName->getProperty() << '/' << topicName->getNamespacePortion() << '/'
+        completeUrlStream << url << ADMIN_PATH_V2 << topicName->getDomain() << '/' << topicName->getProperty()
+                          << '/' << topicName->getNamespacePortion() << '/'
                           << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME;
     } else {
-        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << topicName->getDomain() << '/'
-                          << topicName->getProperty() << '/' << topicName->getCluster() << '/'
-                          << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName()
-                          << '/' << PARTITION_METHOD_NAME;
+        completeUrlStream << url << ADMIN_PATH_V1 << topicName->getDomain() << '/' << topicName->getProperty()
+                          << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME;
     }
 
     completeUrlStream << "?checkAllowAutoCreation=true";
@@ -120,11 +120,12 @@ Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
     NamespaceTopicsPromise promise;
     std::stringstream completeUrlStream;
 
+    const auto &url = serviceNameResolver_.resolveHost();
     if (nsName->isV2()) {
-        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/'
+        completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/'
                           << "topics";
     } else {
-        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/'
+        completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/'
                           << "destinations";
     }
 
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h
index 3d0d39ee90a..f401e879a5b 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -23,6 +23,7 @@
 #include <lib/ClientImpl.h>
 #include <lib/Url.h>
 #include <lib/VersionInternal.h>
+#include <lib/ServiceNameResolver.h>
 
 namespace pulsar {
 class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> {
@@ -42,7 +43,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
     typedef Promise<Result, LookupDataResultPtr> LookupPromise;
 
     ExecutorServiceProviderPtr executorProvider_;
-    std::string adminUrl_;
+    ServiceNameResolver& serviceNameResolver_;
     AuthenticationPtr authenticationPtr_;
     int lookupTimeoutInSeconds_;
     std::string tlsTrustCertsFilePath_;
@@ -60,13 +61,13 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
     Result sendHTTPRequest(std::string completeUrl, std::string& responseData);
 
    public:
-    HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
+    HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
 
-    Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);
+    LookupResultFuture getBroker(const TopicName& topicName) override;
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&);
+    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&) override;
 
-    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h
index 2fdb51a2946..50f2d84f876 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -25,6 +25,7 @@
 #include <lib/LogUtils.h>
 #include <lib/TopicName.h>
 
+#include <iostream>
 #include <vector>
 
 namespace pulsar {
@@ -34,12 +35,26 @@ typedef std::shared_ptr<Promise<Result, NamespaceTopicsPtr>> NamespaceTopicsProm
 
 class LookupService {
    public:
-    /*
-     * @param    topicName - topic name
+    struct LookupResult {
+        std::string logicalAddress;
+        std::string physicalAddress;
+
+        friend std::ostream& operator<<(std::ostream& os, const LookupResult& lookupResult) {
+            return os << "logical address: " << lookupResult.logicalAddress
+                      << ", physical address: " << lookupResult.physicalAddress;
+        }
+    };
+    using LookupResultFuture = Future<Result, LookupResult>;
+    using LookupResultPromise = Promise<Result, LookupResult>;
+
+    /**
+     * Call broker lookup-api to get broker which serves namespace bundle that contains the given topic.
      *
-     * Looks up the owner broker for the given topic name
+     * @param topicName the topic name
+     * @return a pair of addresses, representing the logical and physical addresses of the broker that serves
+     * the topic
      */
-    virtual Future<Result, LookupDataResultPtr> lookupAsync(const std::string& topicName) = 0;
+    virtual LookupResultFuture getBroker(const TopicName& topicName) = 0;
 
     /*
      * @param    topicName - pointer to topic name
diff --git a/pulsar-client-cpp/lib/PulsarScheme.h b/pulsar-client-cpp/lib/PulsarScheme.h
new file mode 100644
index 00000000000..e292687275f
--- /dev/null
+++ b/pulsar-client-cpp/lib/PulsarScheme.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <stdexcept>
+#include <string>
+
+namespace pulsar {
+
+enum PulsarScheme
+{
+    PULSAR,
+    PULSAR_SSL,
+    HTTP,
+    HTTPS
+};
+
+namespace scheme {
+
+inline PulsarScheme toScheme(const std::string& scheme) {
+    if (scheme == "pulsar") {
+        return PulsarScheme::PULSAR;
+    } else if (scheme == "pulsar+ssl") {
+        return PulsarScheme::PULSAR_SSL;
+    } else if (scheme == "http") {
+        return PulsarScheme::HTTP;
+    } else if (scheme == "https") {
+        return PulsarScheme::HTTPS;
+    } else {
+        throw std::invalid_argument("Invalid scheme: " + scheme);
+    }
+}
+
+inline const char* getSchemeString(PulsarScheme scheme) {
+    switch (scheme) {
+        case PulsarScheme::PULSAR:
+            return "pulsar://";
+        case PulsarScheme::PULSAR_SSL:
+            return "pulsar+ssl://";
+        case PulsarScheme::HTTP:
+            return "http://";
+        case PulsarScheme::HTTPS:
+            return "https://";
+        default:
+            return "unknown://";
+    }
+}
+
+inline short getDefaultPort(PulsarScheme scheme) {
+    switch (scheme) {
+        case PulsarScheme::PULSAR:
+            return 6650;
+        case PulsarScheme::PULSAR_SSL:
+            return 6651;
+        case PulsarScheme::HTTP:
+            return 8080;
+        case PulsarScheme::HTTPS:
+            return 8081;
+        default:
+            throw std::invalid_argument("Unexpected scheme: " + std::to_string(scheme));
+    }
+}
+
+}  // namespace scheme
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ServiceNameResolver.h b/pulsar-client-cpp/lib/ServiceNameResolver.h
new file mode 100644
index 00000000000..60351d8037f
--- /dev/null
+++ b/pulsar-client-cpp/lib/ServiceNameResolver.h
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <assert.h>
+#include <atomic>
+#include "ServiceURI.h"
+
+namespace pulsar {
+
+class ServiceNameResolver {
+   public:
+    ServiceNameResolver(const std::string& uriString)
+        : serviceUri_(uriString), numAddresses_(serviceUri_.getServiceHosts().size()) {
+        assert(numAddresses_ > 0);  // the validation has been done in ServiceURI
+    }
+
+    ServiceNameResolver(const ServiceNameResolver&) = delete;
+    ServiceNameResolver& operator=(const ServiceNameResolver&) = delete;
+
+    bool useTls() const noexcept {
+        return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL ||
+               serviceUri_.getScheme() == PulsarScheme::HTTPS;
+    }
+
+    bool useHttp() const noexcept {
+        return serviceUri_.getScheme() == PulsarScheme::HTTP ||
+               serviceUri_.getScheme() == PulsarScheme::HTTPS;
+    }
+
+    const std::string& resolveHost() {
+        return serviceUri_.getServiceHosts()[(numAddresses_ == 1) ? 0 : (index_++ % numAddresses_)];
+    }
+
+   private:
+    const ServiceURI serviceUri_;
+    const size_t numAddresses_;
+    std::atomic_size_t index_{0};
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ServiceURI.cc b/pulsar-client-cpp/lib/ServiceURI.cc
new file mode 100644
index 00000000000..ec515b2444a
--- /dev/null
+++ b/pulsar-client-cpp/lib/ServiceURI.cc
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ServiceURI.h"
+#include <stdexcept>
+
+namespace pulsar {
+
+static void processAddress(std::string& address, PulsarScheme scheme) {
+    const auto posOfSlash = address.find('/');
+    if (posOfSlash != std::string::npos) {
+        // ignore the path part
+        address.erase(posOfSlash);
+    }
+    auto fail = [&address] { throw std::invalid_argument("invalid hostname: " + address); };
+
+    const auto posOfColon = address.find(':');
+    if (posOfColon != std::string::npos) {
+        if (address.find(':', posOfColon + 1) != std::string::npos) {
+            fail();
+        }
+        try {
+            const auto port = std::stoi(address.substr(posOfColon + 1));
+            if (port < 0 || port > 65535) {
+                throw std::invalid_argument("");
+            }
+        } catch (const std::invalid_argument& ignored) {
+            fail();
+        }
+    } else {
+        address = address + ":" + std::to_string(scheme::getDefaultPort(scheme));
+    }
+    if (!address.empty()) {
+        address = scheme::getSchemeString(scheme) + address;
+    }
+}
+
+auto ServiceURI::parse(const std::string& uriString) -> DataType {
+    size_t pos = uriString.find("://");
+    if (pos == std::string::npos) {
+        throw std::invalid_argument("The scheme part is missing: " + uriString);
+    }
+    if (pos == 0) {
+        throw std::invalid_argument("Expected scheme name at index 0: " + uriString);
+    }
+    const auto scheme = scheme::toScheme(uriString.substr(0, pos));
+
+    pos += 3;  // now it points to the end of "://"
+    if (pos < uriString.size() && uriString[pos] == '/') {
+        throw std::invalid_argument("authority component is missing in service uri: " + uriString);
+    }
+
+    std::vector<std::string> addresses;
+    while (pos < uriString.size()) {
+        const size_t endPos = uriString.find(',', pos);
+        if (endPos == std::string::npos) {
+            addresses.emplace_back(uriString.substr(pos, endPos - pos));
+            break;
+        }
+        addresses.emplace_back(uriString.substr(pos, endPos - pos));
+        pos = endPos + 1;
+    }
+
+    bool hasEmptyAddress = false;
+    for (auto& address : addresses) {
+        processAddress(address, scheme);
+        if (address.empty()) {
+            hasEmptyAddress = true;
+        }
+    }
+    if (hasEmptyAddress) {
+        auto originalAddresses = addresses;
+        addresses.clear();
+        for (const auto& address : originalAddresses) {
+            if (!address.empty()) {
+                addresses.emplace_back(address);
+            }
+        }
+    }
+    if (addresses.empty()) {
+        throw std::invalid_argument("No service url is provided yet");
+    }
+    return std::make_pair(scheme, addresses);
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ServiceURI.h b/pulsar-client-cpp/lib/ServiceURI.h
new file mode 100644
index 00000000000..4f459d987f6
--- /dev/null
+++ b/pulsar-client-cpp/lib/ServiceURI.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+#include <vector>
+#include "PulsarScheme.h"
+
+namespace pulsar {
+
+class ServiceURI {
+   public:
+    /**
+     * @param uriString the URL string that is used to create a pulsar::Client object
+     * @throws std::invalid_argument if `uriString` is invalid
+     */
+    ServiceURI(const std::string& uriString) : data_(parse(uriString)) {}
+
+    PulsarScheme getScheme() const noexcept { return data_.first; }
+
+    const std::vector<std::string>& getServiceHosts() const noexcept { return data_.second; }
+
+   private:
+    // The 2 elements of the pair are:
+    // 1. The Scheme of the lookup protocol
+    // 2. The available addresses, each item is like "pulsar://localhost:6650"
+    using DataType = std::pair<PulsarScheme, std::vector<std::string>>;
+    const DataType data_;
+
+    static DataType parse(const std::string& uriString);
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 2e6232cf001..70b7b7e5072 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -150,19 +150,19 @@ std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) {
     return nameAfterEncoding;
 }
 
-bool TopicName::isV2Topic() { return isV2Topic_; }
+bool TopicName::isV2Topic() const { return isV2Topic_; }
 
-std::string TopicName::getDomain() { return domain_; }
+std::string TopicName::getDomain() const { return domain_; }
 
-std::string TopicName::getProperty() { return property_; }
+std::string TopicName::getProperty() const { return property_; }
 
-std::string TopicName::getCluster() { return cluster_; }
+std::string TopicName::getCluster() const { return cluster_; }
 
-std::string TopicName::getNamespacePortion() { return namespacePortion_; }
+std::string TopicName::getNamespacePortion() const { return namespacePortion_; }
 
 std::string TopicName::getLocalName() { return localName_; }
 
-std::string TopicName::getEncodedLocalName() { return getEncodedName(localName_); }
+std::string TopicName::getEncodedLocalName() const { return getEncodedName(localName_); }
 
 bool TopicName::operator==(const TopicName& other) {
     return (this->topicName_.compare(other.topicName_) == 0);
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index 1d5deab553d..d8620ea1fee 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -47,14 +47,14 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId {
     int partition_ = -1;
 
    public:
-    bool isV2Topic();
+    bool isV2Topic() const;
     std::string getLookupName();
-    std::string getDomain();
-    std::string getProperty();
-    std::string getCluster();
-    std::string getNamespacePortion();
+    std::string getDomain() const;
+    std::string getProperty() const;
+    std::string getCluster() const;
+    std::string getNamespacePortion() const;
     std::string getLocalName();
-    std::string getEncodedLocalName();
+    std::string getEncodedLocalName() const;
     std::string toString() const;
     bool isPersistent() const;
     NamespaceNamePtr getNamespaceName();
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 127ecc4247c..43c7f4d3f3f 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -118,10 +118,8 @@ class PulsarTest(TestCase):
         self.assertEqual(conf.replicate_subscription_state_enabled(), True)
 
     def test_connect_error(self):
-        with self.assertRaises(pulsar.ConnectError):
-            client = Client("fakeServiceUrl")
-            client.create_producer("connect-error-topic")
-            client.close()
+        with self.assertRaises(ValueError):
+            Client("fakeServiceUrl")
 
     def test_exception_inheritance(self):
         assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index cd5e28814e6..d4e7ec7284d 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -47,7 +47,9 @@ if [ -f /gtest-parallel/gtest-parallel ]; then
     fi
     python3 /gtest-parallel/gtest-parallel $tests --dump_json_test_results=/tmp/gtest_parallel_results.json \
       --workers=$gtest_workers --retry_failed=$RETRY_FAILED -d /tmp \
-      ./main
+      ./main --gtest_filter='-CustomLoggerTest*'
+    # The customized logger might affect other tests
+    ./main --gtest_filter='CustomLoggerTest*'
     RES=$?
 else
     ./main
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index da5c60952dd..6bb31d0490d 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -23,6 +23,7 @@
 #include <vector>
 #include <cstring>
 #include <sstream>
+#include <stdexcept>
 #include <algorithm>
 #include <functional>
 
@@ -496,28 +497,11 @@ TEST(BasicEndToEndTest, testSubscribeCloseUnsubscribeSherpaScenario) {
 }
 
 TEST(BasicEndToEndTest, testInvalidUrlPassed) {
-    Client client("localhost:4080");
-    std::string topicName = "testInvalidUrlPassed";
-    std::string subName = "test-sub";
-    Producer producer;
-    Result result = client.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
-
-    Client client1("test://localhost");
-    result = client1.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
-
-    Client client2("test://:4080");
-    result = client2.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
-
-    Client client3("");
-    result = client3.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
-
-    Client client4("Dream of the day when this will be a valid URL");
-    result = client4.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
+    EXPECT_THROW({ Client{"localhost:4080"}; }, std::invalid_argument);
+    EXPECT_THROW({ Client{"test://localhost"}; }, std::invalid_argument);
+    EXPECT_THROW({ Client{"test://:4080"}; }, std::invalid_argument);
+    EXPECT_THROW({ Client{""}; }, std::invalid_argument);
+    EXPECT_THROW({ Client{"Dream of the day when this will be a valid URL"}; }, std::invalid_argument);
 }
 
 void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) {
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/LookupServiceTest.cc
similarity index 61%
rename from pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
rename to pulsar-client-cpp/tests/LookupServiceTest.cc
index b880df3b298..14c5695a828 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/LookupServiceTest.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include <BinaryProtoLookupService.h>
+#include <HTTPLookupService.h>
 #include <pulsar/Client.h>
 
 #include <gtest/gtest.h>
@@ -26,17 +27,23 @@
 #include "HttpHelper.h"
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
+#include "LogUtils.h"
+
+#include <algorithm>
 
 using namespace pulsar;
 
-TEST(BinaryLookupServiceTest, basicLookup) {
+DECLARE_LOG_OBJECT()
+
+TEST(LookupServiceTest, basicLookup) {
     ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1);
     AuthenticationPtr authData = AuthFactory::Disabled();
     std::string url = "pulsar://localhost:6650";
     ClientConfiguration conf;
     ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
     ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
-    BinaryProtoLookupService lookupService(pool_, url);
+    ServiceNameResolver serviceNameResolver(url);
+    BinaryProtoLookupService lookupService(serviceNameResolver, pool_, "");
 
     TopicNamePtr topicName = TopicName::get("topic");
 
@@ -46,15 +53,17 @@ TEST(BinaryLookupServiceTest, basicLookup) {
     ASSERT_TRUE(lookupData != NULL);
     ASSERT_EQ(0, lookupData->getPartitions());
 
-    Future<Result, LookupDataResultPtr> future = lookupService.lookupAsync("topic");
-    result = future.get(lookupData);
+    const auto topicNamePtr = TopicName::get("topic");
+    auto future = lookupService.getBroker(*topicNamePtr);
+    LookupService::LookupResult lookupResult;
+    result = future.get(lookupResult);
 
     ASSERT_EQ(ResultOk, result);
-    ASSERT_TRUE(lookupData != NULL);
-    ASSERT_EQ(url, lookupData->getBrokerUrl());
+    ASSERT_EQ(url, lookupResult.logicalAddress);
+    ASSERT_EQ(url, lookupResult.physicalAddress);
 }
 
-TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
+TEST(LookupServiceTest, basicGetNamespaceTopics) {
     std::string url = "pulsar://localhost:6650";
     std::string adminUrl = "http://localhost:8080/";
     Result result;
@@ -98,7 +107,8 @@ TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
     ClientConfiguration conf;
     ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
     ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
-    BinaryProtoLookupService lookupService(pool_, url);
+    ServiceNameResolver serviceNameResolver(url);
+    BinaryProtoLookupService lookupService(serviceNameResolver, pool_, "");
 
     TopicNamePtr topicName = TopicName::get(topicName1);
     NamespaceNamePtr nsName = topicName->getNamespaceName();
@@ -117,3 +127,55 @@ TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
 
     client.shutdown();
 }
+
+static void testMultiAddresses(LookupService& lookupService) {
+    std::vector<Result> results;
+    constexpr int numRequests = 6;
+
+    auto verifySuccessCount = [&results] {
+        // Only half of them succeeded
+        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2);
+        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultConnectError), numRequests / 2);
+    };
+
+    for (int i = 0; i < numRequests; i++) {
+        const auto topicNamePtr = TopicName::get("topic");
+        LookupService::LookupResult lookupResult;
+        const auto result = lookupService.getBroker(*topicNamePtr).get(lookupResult);
+        LOG_INFO("getBroker [" << i << "] " << result << ", " << lookupResult);
+        results.emplace_back(result);
+    }
+    verifySuccessCount();
+
+    results.clear();
+    for (int i = 0; i < numRequests; i++) {
+        LookupDataResultPtr data;
+        const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
+        LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result);
+        results.emplace_back(result);
+    }
+    verifySuccessCount();
+
+    results.clear();
+    for (int i = 0; i < numRequests; i++) {
+        NamespaceTopicsPtr data;
+        const auto result =
+            lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data);
+        LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result);
+        results.emplace_back(result);
+    }
+    verifySuccessCount();
+}
+
+TEST(LookupServiceTest, testMultiAddresses) {
+    ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), true);
+    ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
+    BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, "");
+    testMultiAddresses(binaryLookupService);
+
+    // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test
+    ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999");
+    auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
+        std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
+    testMultiAddresses(*httpLookupServicePtr);
+}
diff --git a/pulsar-client-cpp/tests/ServiceURITest.cc b/pulsar-client-cpp/tests/ServiceURITest.cc
new file mode 100644
index 00000000000..9d4c88fc497
--- /dev/null
+++ b/pulsar-client-cpp/tests/ServiceURITest.cc
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include "lib/ServiceURI.h"
+
+using namespace pulsar;
+
+static void verifyServiceURIFailure(const std::string& uriString, const std::string& errorMsg) {
+    try {
+        ServiceURI uri{uriString};
+        std::cerr << uriString << " should be invalid" << std::endl;
+        FAIL();
+    } catch (const std::invalid_argument& e) {
+        EXPECT_EQ(errorMsg, e.what());
+    }
+}
+
+static void verifyServiceURI(const std::string& uriString, PulsarScheme expectedScheme,
+                             const std::vector<std::string>& expectedServiceHosts) {
+    ServiceURI uri{uriString};
+    EXPECT_EQ(uri.getScheme(), expectedScheme);
+    EXPECT_EQ(uri.getServiceHosts(), expectedServiceHosts);
+}
+
+TEST(ServiceURITest, testInvalidServiceUris) {
+    verifyServiceURIFailure("localhost:6650", "The scheme part is missing: localhost:6650");
+    verifyServiceURIFailure("unknown://localhost:6650", "Invalid scheme: unknown");
+    verifyServiceURIFailure("://localhost:6650", "Expected scheme name at index 0: ://localhost:6650");
+    verifyServiceURIFailure("pulsar:///", "authority component is missing in service uri: pulsar:///");
+    verifyServiceURIFailure("pulsar://localhost:6650:6651", "invalid hostname: localhost:6650:6651");
+    verifyServiceURIFailure("pulsar://localhost:xyz/", "invalid hostname: localhost:xyz");
+    verifyServiceURIFailure("pulsar://localhost:-6650/", "invalid hostname: localhost:-6650");
+}
+
+TEST(ServiceURITest, testPathIgnored) {
+    verifyServiceURI("pulsar://localhost:6650", PulsarScheme::PULSAR, {"pulsar://localhost:6650"});
+    verifyServiceURI("pulsar://localhost:6650/", PulsarScheme::PULSAR, {"pulsar://localhost:6650"});
+}
+
+TEST(ServiceURITest, testMultipleHostsComma) {
+    verifyServiceURI("pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace", PulsarScheme::PULSAR,
+                     {"pulsar://host1:6650", "pulsar://host2:6650", "pulsar://host3:6650"});
+}
+
+TEST(ServiceURITest, testMultipleHostsWithoutPulsarPorts) {
+    verifyServiceURI("pulsar://host1,host2,host3/path/to/namespace", PulsarScheme::PULSAR,
+                     {"pulsar://host1:6650", "pulsar://host2:6650", "pulsar://host3:6650"});
+    verifyServiceURI("pulsar+ssl://host1,host2,host3/path/to/namespace", PulsarScheme::PULSAR_SSL,
+                     {"pulsar+ssl://host1:6651", "pulsar+ssl://host2:6651", "pulsar+ssl://host3:6651"});
+    verifyServiceURI("http://host1,host2,host3/path/to/namespace", PulsarScheme::HTTP,
+                     {"http://host1:8080", "http://host2:8080", "http://host3:8080"});
+    verifyServiceURI("https://host1,host2,host3/path/to/namespace", PulsarScheme::HTTPS,
+                     {"https://host1:8081", "https://host2:8081", "https://host3:8081"});
+}
+
+TEST(ServiceURITest, testMultipleHostsMixed) {
+    verifyServiceURI("pulsar://host1:6640,host2,host3:6660/path/to/namespace", PulsarScheme::PULSAR,
+                     {"pulsar://host1:6640", "pulsar://host2:6650", "pulsar://host3:6660"});
+}